精华内容
下载资源
问答
  • mq延迟队列
    千次阅读
    2019-04-28 13:16:29

    通过RabbitMQ 死信队列实现延迟MQ消息,消息延迟,MQ延迟队列


    1、延迟消息原理

    客户端向MQ服务器发送一条队列消息,该消息设置了TTL【该消息在超过TTL的时间内没有被消费,就会被视为是死信】,但是不给该队列提供消费者,在消息超时后,由死信交换机转发该消息到指定的消费者,以实现延迟队列。

    简单点说,就是向一个没有消费者的队列发送一条有过期机制的消息,消息过期后死信交换机DLK把消息转发给一个没有生产者的队列,以实现消费。

    2、如何在Springboot 下创建延迟消息队列

    只需要正常的创建2个队列,然后设置三个属性即可创建延迟消息队列

        @Bean(DEAD_LET
    更多相关内容
  • MQ延迟队列实现延迟消息

    在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务。

    那么如何实现延迟任务呢?

    第一反应是利用cron方案来实现:

    启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

    cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

    • 当数据量大的时候轮询效率低;
    • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
    • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

    既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。

    实现

    RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

    注意: RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

    下面我们结合SprintBoot利用RocketMQ发送延时消息

    • 引入RocketMQ组件
    <dependency>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-spring-boot-starter</artifactId>
     </dependency>
    • 增加RocketMQ的配置

    rocketmq:  

            name-server: 172.31.0.44:9876  

            producer:    

            group: delay-group

    • 编写生产者
    @Component
    @Slf4j
    public class DelayProduce {   
                
      
         @Autowired    
         private RocketMQTemplate rocketMQTemplatet;  
      
         
     
         public void sendDelayMessage(String topic,String message,int delayLevel{       
     
         SendResult sendResult = 
         rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 
          2000, delayLevel);        
         log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd 
            日 HH:mm:ss").format(LocalDateTime.now()));        
         log.info("sendResult is{}",sendResult);    
    }
    }
    • 编写消费者
    @Slf4j@Component@RocketMQMessageListener(        topic = "delay-topic",        consumerGroup = "delay-group")public class DelayConsumer implements RocketMQListener<String> {    @Override    public void onMessage(String message) {        log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));        log.info("received message is {}",message);    }}
    • 测试
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DelayProduceTest {    
    @Autowired    
    private DelayProduce delayProduce;    
    @Test    
    public void sendDelayMessage() {        
    delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);    
    }}

    这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。

    • 运行结果

    发送时间

    消费时间

    修改延时级别

    RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。

    • 打开RocketMQ的配置文件,修改messageDelayLevel 属性

    brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSHstorePathRootDir = /app/rocketmq/datamessageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。

    nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

    • 使用延时等级1发送消息

    public void sendDelayMessage() { delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);}

    • 测试

    发送时间

    消费时间

    通过比对发送时间与消费时间证明延时等级修改生效。

    展开全文
  • 一篇带您搞懂MQ延迟队列【实战操作】

    千次阅读 热门讨论 2021-05-26 15:31:54
    文章目录前言RabbitMq 专栏直通车MQ-死信队列(延迟操作)外加消息确认模式死信队列消息模型构建大概有几步?01::前期准备:引入相关依赖02::整合RabbitMQ02::01-加入RabbitMq相关配置03:: 创建真实队列--交换机、队列、...

    前言

      如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
      而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


    RabbitMq 专栏直通车

    序号直通车
    1干货实战-RabbitMQ(消息队列)的特性-并用具体应用场景来介绍
    2windows rabbitMQ安装(轻松简单,快速上手)
    3纯干货-详解RabbitMQ的消息模型(看一看,说不定有你想要的呢~~)
    4干货实战演练-RabbitMQ普通消息模型(字节流接受模式)<<带源码>>
    5干货实战演练-RabbitMQ普通消息模型(对象接受模式)<<带源码>>
    6干货实战演练-RabbitMQ广播消息模型(字节流接受模式)<<带源码>>
    7干货实战演练-RabbitMQ直连消息模型<<带源码>>
    8干货实战演练-RabbitMQ订阅消息模型<<带源码>>
    9干货实战-RabbitMQ的消息高可用和确认消费
    10干货实战演练-RabbitMQ基于MANUAL机制手动确认消费模型<<带源码>>
    11详解-RabbitMQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)
    12一篇带您搞懂MQ延迟队列实战操作<<带源码>>

    MQ-死信队列(延迟操作)外加消息确认模式

    先回顾一下RabbitMq核心基础组件:

    • 【生产者】: 用于生产、发送信息的模块
    • 【消费者】: 用于监听、接受、消费和处理信息的模块
    • 【消息】: 可以看成一串实质的数据,如文字、图片、等等,在整个传输过程中,消息是通过二进制数据流来传递的
    • 【队列】:消费的暂存区或者存储区,可以理解为中转站,即生产者 -> 队列 -> 消费者
    • 【交换机】:同样也可以看成是中转站,用于首次接受和分发消息
    • 【路由】:相当于网关、秘钥、地址等等,一般不单独使用,绑定到交换机上,将消息指定到指定的队列

    通过上篇文章,我们了解了死信队列的用处,这篇文章主要讲的就是实战,基本上都是代码。

    死信队列消息模型构建大概有几步?

    1. 创建死信队列
    2. 创建基本交换机 —> 面向生产者
    3. 创建基本绑定 —>基本交换机+基本路由 —> 面向生产者
    4. 创建死信交换机
    5. 创建死信路由及其绑定真正的消费队列

    好了,现在废话不多说,开始了;

    <<<<<<<<<<<<<<<<<<<<<<<<<<开始演练>>>>>>>>>>>>>>>>>>>>>>>>>>>

    01::前期准备:引入相关依赖

    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-amqp</artifactId>
    			<version>1.3.3.RELEASE</version>
    		</dependency>
    

    02::整合RabbitMQ

    02::01-加入RabbitMq相关配置

    即在配置文件中加入RabbitMq的配置,ip、端口号、账户密码等等…

    spring:
      rabbitmq: #RabbitMq 配置
        virtual-host: /
        host: 127.0.0.1  #IP
        post: 5672  #提供服务时的端口
        username: guest #连接RabbitMQ的账户
        password: guest #连接RabbitMQ的密码
    

    03:: 创建真实队列–交换机、队列、绑定,和确认消费相关配置

    
    /**
     * 基于手动确认消费模式实战配置
     * @author yangzhenyu
     * */
    @Configuration
    public class ManualMqConfig {
        private static Logger log = LoggerFactory.getLogger(ManualMqConfig.class);
        public ManualMqConfig() {
            log.info("=================== 基于手动确认消费模式实战配置注入IOC===================");
        }
        @Autowired
        private Environment environment;
        //自动装配 RabbitMQ 的连接工厂实例
        @Autowired
        private CachingConnectionFactory cachingConnectionFactory;
    
        //消费者
        @Autowired
        private ManualConsumer manualConsumer;
    
        //创建队列
        @Bean(name = "manualQueueOne")
        public Queue manualQueueOne(){
            return new Queue(environment.getProperty("mq.yzy.info.manualqueue.name"),true);
        }
    
        //交换机
        @Bean
        public DirectExchange manualExchange(){
            return new DirectExchange(environment.getProperty("mq.yzy.info.manualexchange.name"),true,false);
        }
    
        //创建绑定
        //directQueueOne
        @Bean
        public Binding basicBindingOne(){
            return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(environment.getProperty("mq.yzy.info.manualrouting.key.name"));
        }
    
        /**
         * 基于手动确认消费模式实战配置
         * */
        @Bean(name = "manualListenerContainer")
        public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue){
            //自定义消息监听器所在的容器工厂
            SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
            //设置容器工厂所用的实例
            factory.setConnectionFactory(cachingConnectionFactory);
            //设置消息的确认消费模式,在这里为MANUAL,表示手动确认消费
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
            //设置并发消费者实例的初始值
            factory.setConcurrentConsumers(1);
            //设置并发消费者实例的最大数量
            factory.setMaxConcurrentConsumers(1);
            //设置并发消费者实例中每个实例拉取的消息数量
            factory.setPrefetchCount(1);
    
            //指定该容器监听的队列
            factory.setQueues(manualQueue);
            //指定该容器中的消费监听器 即消费者
            factory.setMessageListener(manualConsumer);
            return factory;
        }
    }
    

    03::01:设置消息的确认消费模式为手动确认

    在这里插入图片描述

    03::02:指定该容器监听的队列

    在这里插入图片描述
    在这里插入图片描述

    03::03:指定该容器中的消费监听器 即消费者

    在这里插入图片描述

    在这里插入图片描述

    03::04:交换机、队列持久化配置

    在这里插入图片描述

    03::05:相关配置信息:

    mq:
      env: loacl #自定义变量,表示本地开发
      yzy:
        info:
          manualqueue: #消息确认模式
            name: ${mq.env}.middleware.mq.yzy.info.manualqueue.one
          manualexchange: #消息确认模式
            name: ${mq.env}.middleware.mq.yzy.info.manualexchange.one
          manualrouting:
            key: #确认模式
              name: ${mq.env}.middleware.mq.yzy.info.manualrouting.key.one
    

    04:: 制作真实队列消费者,即上述模型中指定的消费者

    如下:

    
    /**
     * 认为手动确认消费-消费者-字节流模式
     * @author yangzhenyu
     * */
    @Component("manualConsumer")
    public class ManualConsumer implements ChannelAwareMessageListener {
        private static Logger log = LoggerFactory.getLogger(ManualConsumer.class);
    
        //序列化和返序列化
        @Autowired
        private ObjectMapper objectMapper;
    
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //获取消息属性
            MessageProperties messageProperties = message.getMessageProperties();
            //获取消息分发时的全局唯一标识
            long tag = messageProperties.getDeliveryTag();
            try{
                //获得消息体
                byte [] msg = message.getBody();
                //解析消息体
                Student student = objectMapper.readValue(msg,Student.class);
                log.info("基于manual机制-确认消息模式-人为手动确定消费-监听到消息:【{}】",objectMapper.writeValueAsString(student));
                //执行完逻辑后手动确认,第一个参数代表消息的分发标识(全局唯一),第二个参数代表是否允许批量确认消费
                channel.basicAck(tag,true);
            }catch (Exception e){
                log.error("确认消息模式-人为手动确定消费-发生异常:",e.fillInStackTrace());
                /**
                 * 如果在处理消息的过程中发生异常,则需要人为手动确认消费掉该消息
                 * 否则该消息将一直停留在队列中,从而导致重复消费
                 * */
                channel.basicReject(tag,false);
            }
        }
    }
    

    05:: 制作死信队列模型配置

    如下:

    /**
     * 死信队列消息模型构建
     * @author yangzhenyu
     * */
    @Configuration
    public class DeadExchangeConfig {
        private static Logger log = LoggerFactory.getLogger(DeadExchangeConfig.class);
        public DeadExchangeConfig() {
            log.info("=================== 死信队列消息模型构建注入IOC===================");
        }
        @Autowired
        private Environment environment;
        //死信路由
        private final static String DEAD_ROUTING="mq.yzy.info.deadrouting.key.name";
        //死信交换机
        private final static String DEAD_EXCHANGE = "mq.yzy.info.deadexchange.name";
        //死信队列
        private final static String DEAD_QUEUE = "mq.yzy.info.deadqueue.name";
        //死信模型->基本模型->基本交换机(面向生产者)
        private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
        //死信模型->基本模型->基本路由(面向生产者)
        private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
        //真正的队列 -> 面向消费者
        private final static String REAL_QUEUE="mq.yzy.info.manualqueue.name";
    
        /**
         * 创建死信队列
         * */
        @Bean
        public Queue basicDeadQueue(){
            //创建死信队列的组成成分map,用来存放组成成员的相关成员
            Map<String,Object> args = new HashMap<>(3);
            //创建死信交换机
            args.put("x-dead-letter-exchange",environment.getProperty(DEAD_EXCHANGE));
            //创建死信路由
            args.put("x-dead-letter-routing-key",environment.getProperty(DEAD_ROUTING));
            //设定 TTL ,单位是毫秒,在这里指的是60s
            args.put("x-message-ttl",60000);
            //创建并返回死信队列实例
            return new Queue(environment.getProperty(DEAD_QUEUE),true,false,false,args);
        }
    
        //创建基本交换机 ---> 面向生产者
        @Bean
        public TopicExchange basicProducerExchange(){
            return new TopicExchange(environment.getProperty(DEAD_EXCHANGE_PRODUCER),true,false);
        }
    
        //创建基本绑定 --->基本交换机+基本路由 ---> 面向生产者
        @Bean
        public Binding basicProducerBinding(){
            return BindingBuilder.bind(basicDeadQueue()).to(basicProducerExchange()).with(environment.getProperty(DEAD_ROUTING_PRODUCER));
        }
        //====================================================================================
    
    
    
        //创建死信交换机
        @Bean
        public TopicExchange basicDeadExchange(){
            //创建并返回死信交换机实例
            return new TopicExchange(environment.getProperty(DEAD_EXCHANGE),true,false);
        }
    
        //创建死信路由及其绑定真正的消费队列
        /**
         * @param manualQueue 真正的队列
         * */
        @Bean
        public Binding basicDeadBindingOne(@Qualifier("manualQueueOne") Queue manualQueue){
            return BindingBuilder.bind(manualQueue).to(basicDeadExchange()).with(environment.getProperty(DEAD_ROUTING));
        }
    
    }
    

    05::01 制作死信队列模型配置

    mq:
      env: loacl #自定义变量,表示本地开发
      yzy:
        info:
          deadqueue: #死信队列
            name:  ${mq.env}.middleware.mq.yzy.info.deadqueue.one
          deadrouting: #死信路由
            key:
              name: ${mq.env}.middleware.mq.yzy.info.deadrouting.key.one
            producer: #死信消息模型中 基本模型中的路由
              key:
                name: ${mq.env}.middleware.mq.yzy.info.deadrouting.producer.key.one
    

    06:: 制作死信队列-生产者

    package com.yzy.demo.rabbitmq.dead.publisher;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.yzy.demo.test.vo.Student;
    import org.slf4j.LoggerFactory;
    import org.slf4j.Logger;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.env.Environment;
    import org.springframework.stereotype.Component;
    
    /**
     * 死信队列消息模型构建---生产者
     * @author yangzhenyu
     * */
    @Component
    public class DeadPublisher {
        private static Logger log = LoggerFactory.getLogger(DeadPublisher.class);
        //序列化和返序列化
        @Autowired
        private ObjectMapper objectMapper;
        //定义RabbitMQ 组件
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //定义环境变量读取实例
        @Autowired
        private Environment env;
    
        //死信模型->基本模型->基本交换机(面向生产者)
        private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
        //死信模型->基本模型->基本路由(面向生产者)
        private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
    
        /**
         * 发送对象类型的消息给死信队列
         * @param info
         * */
        public void sendMsg(Student info){
            try{
                if (info != null){
                    //定义消息传输的格式为json
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    //指定消息模型中的交换机
                    rabbitTemplate.setExchange(env.getProperty(DEAD_EXCHANGE_PRODUCER));
                    //将字符串值转换成二进制的数据流
                    Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info))
                            .build();
                    rabbitTemplate.convertAndSend(env.getProperty(DEAD_ROUTING_PRODUCER),msg, new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            //获取消息的属性
                            MessageProperties messageProperties = message.getMessageProperties();
                            //设置消息的持久化模式
                            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            //设置消息头,即指定发送消息的所属对象类型
                            messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Student.class);
                            //设置消息的TTL,当消息和队列同时设置TTL时,取最短 10s
                            messageProperties.setExpiration(String.valueOf(10000));
                            return message;
                        }
                    });
                    log.info("死信队列消息模型 -生产者发出消息:{}",objectMapper.writeValueAsString(info));
                }
            }catch (Exception e){
                log.error("死信队列消息模型 -生产者发出消息-发生异常:{}",info,e.fillInStackTrace());
            }
        }
    }
    
    

    注: 置消息的TTL,当消息和队列同时设置TTL时,取最短。

    07:: 制作测试代码

    如下:

        //死信队列-延迟
        @Autowired
        private DeadPublisher deadPublisher;
        @Autowired
        private ObjectMapper objectMapper;
        /**
         * 死信队列模型演示
         * */
        @ApiOperation(value = "死信队列模型演示",notes = "死信队列模型演示")
        @ResponseBody
        @PostMapping("/deadPublisher")
        public ResponseBo deadPublisher(@RequestBody @Valid Student vo) throws JsonProcessingException {
            String msgValue = "deadPublisher";
            long startTime = init(msgValue,objectMapper.writeValueAsString(vo));
            try{
                deadPublisher.sendMsg(vo);
                endLog(msgValue,startTime);
            }catch (Exception e){
                endLogError(msgValue,startTime,e);
            }
            return ResponseBo.ok();
        }
    

    vo:

    
    /**
     * 学生
     * @author yangzhenyu
     * */
    public class Student implements Serializable {
        /**
         * 序列号
         */
        private static final long serialVersionUID = -5023112818896544461L;
        @NotNull(message = "学生 id值不能为null")
        @ApiModelProperty(" 学生 id值")
        private String sId;
        @ApiModelProperty(" 学生 名称")
        private String sName;
        @ApiModelProperty(" 学生 班级")
        private String className;
    
        public String getsId() {
            return sId;
        }
    
        public void setsId(String sId) {
            this.sId = sId;
        }
    
        public String getsName() {
            return sName;
        }
    
        public void setsName(String sName) {
            this.sName = sName;
        }
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    }
    
    

    注:vo类要继承Serializable ,序列化。

    08:: 测试

    08::01启动demo

    可以在RabbitMq管理平台上看到,我们的交换机和队列已经生成:
    在这里插入图片描述
    点击基础交换机,可以看到绑定的是死信队列:
    在这里插入图片描述
    点击死信交换机,可以看到绑定的是真实的队列:

    在这里插入图片描述
    注:其中"D"表示持久化。

    08::02 通过swagger来测试接口

    即通过观察控制台来判断是否搭建成功。
    先整理一下我们要测试的样例,通过观察控制台打印的结果来判断该消息模型是否搭建成功。
    》》》》开始测试:

    在这里插入图片描述
    在这里插入图片描述
    成功!!!
    观察控制台输出:
    在这里插入图片描述
    在这里插入图片描述

    通过观察发现,死信队列消息模型已经搭建成功!!!

    通过观察发现,该消息模型已经搭建成功!!!

    09::源码

    github:
    https://github.com/yangzhenyu07/springCloud

    本次分享到底结束,感谢观看!!!

    展开全文
  • RabbitMQ延迟队列实现

    2022-02-10 18:16:35
    一、安装插件实现延时队列 通过rabbitmq-delayed-message-exchange插件实现。 插件下载地址 将插件放在rabbitmq的plugins目录,启用插件 rabbitmq enable rabbitmq-delayed-message-exchange 代码实现 @...

    一、安装插件实现延迟队列

    通过rabbitmq-delayed-message-exchange插件实现。
    插件下载地址
    将插件放在rabbitmq的plugins目录,启用插件
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    代码实现

    @Configuration
    public class MessageConfig {
    
        public static final String QueueName = "queue";
        
        public static final String EXchangeName = "dir_change";
    
        @Bean
        Queue queueA(){
            return new  Queue(QueueName,true,false,false);
        }
    
        @Bean
        CustomExchange customExchange(){
            Map<String,Object> args = new HashMap<>();
            args.put("x-delayed-type","direct");
            return new CustomExchange(EXchangeName,"x-delayed-message",true,false,args);
        }
    
        @Bean
        Binding bindingA(){
            return BindingBuilder.bind(queueA()).to(customExchange()).with("a").noargs();
        }
    }
    
    
    
        @Test
        void messageExchange(){
            Message message = MessageBuilder.withBody(("hello message"+new Date()).getBytes()).setHeader("x-delay",3000).build();
            rabbitTemplate.convertSendAndReceive(MessageConfig.EXchangeName,"a",message);
        }
    

    在这里插入图片描述

    二、通过死信队列实现延迟队列

    代码实现

    @Configuration
    public class RabbitConfig {
    
        public static final String MES_QUEUE = "QUEUE1";
    
        public static final String MSG_EXCHANGE = "EXCHANG1";
    
        public static final String DELAY_QUEUE = "DELAY_QUEUE";
    
        public static final String DELAY_EXHCANGE = "DELAY_EXHCANGE";
    
        @Bean
        Queue messageQueue(){
            Map<String,Object> args = new HashMap<>();
            args.put("x-message-ttl",10000);
            args.put("x-dead-letter-exchange",DELAY_EXHCANGE);
            args.put("x-dead-letter-routing-key",DELAY_QUEUE);
            return new Queue(MES_QUEUE,true,false,false,args);
        }
    
        @Bean
        DirectExchange directExchange(){
            return new DirectExchange(MSG_EXCHANGE,true,false);
        }
    
        @Bean
        Binding binding(){
            return BindingBuilder.bind(messageQueue()).to(directExchange()).with(MES_QUEUE);
        }
    
    
        @Bean
        Queue dlxQueue(){
            return new Queue(DELAY_QUEUE,true,false,false);
        }
    
        @Bean
        DirectExchange dlxExchange(){
            return new DirectExchange(DELAY_EXHCANGE,true,false);
        }
    
        @Bean
        Binding bindingDlx(){
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DELAY_QUEUE);
        }
    }
    
    
     @Test
        void messageTest(){
            Message message = MessageBuilder.withBody(("hello message"+new Date()).getBytes()).build();
            rabbitTemplate.convertSendAndReceive(RabbitConfig.MSG_EXCHANGE,RabbitConfig.MES_QUEUE,message);
        }
    

    通过监听死信队列实现消息的延迟发送。 这里写死了过期时间。也可以在发送消息的时候指定过期时间。 都配置的话会选择时间短的。

     @Test
        void messageExchange(){
            Message message = MessageBuilder.withBody(("hello message"+new Date()).getBytes()).setExpiration("8000").build();
            rabbitTemplate.convertSendAndReceive(RabbitConfig.MSG_EXCHANGE,RabbitConfig.MES_QUEUE,message);
        }
    

    在这里插入图片描述

    展开全文
  • RocketMQ 延迟队列

    千次阅读 2021-08-14 21:45:13
    RocketMQ 延迟队列 什么是延迟队列 指消息发送到某个队列后,在指定多长时间之后才能被消费。 应用场景 RocketMQ 延迟队列 定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的...
  • RabbitMq——延迟队列

    千次阅读 2022-03-18 20:33:09
    延迟队列:用于存放需要在指定时间后(TTL)被执行的消息的队列,延迟队列消息过期后会被存放到死信队列,消费者不断对死信队列进行消费。 TTL:消息存活的最大时间,单位为毫秒,在TTL时间内,若消息未被消费,则...
  • 延迟队列的实现有3种方法: 队列设置TTL :不够灵活,每增加一个TTL时间,都要新增一条队列 消息设置TTL :消息入队后,遵循先进先出,TTL长的仍然比TTL短的先消费 基于插件实现: 消息通过自定义交换机后,不会...
  • rabbitmq延迟队列

    千次阅读 2022-04-03 15:10:50
    延迟队列应用场景 场景:“订单下单成功后,15分钟未支付自动取消” 传统处理超时订单,采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性能会有很大的要求,并且当处理大量订单...
  • RabbitMQ:死信队列+延迟队列

    千次阅读 2022-04-28 21:15:27
    文章目录1、死信队列1.1、概念1.2、死信的来源1.3、死信实战1.3.1、消息TTL过期1.3.2、队列达到最大长度1.3.3、消息被拒2、延迟队列2.1、概念2.2、延迟队列使用场景2.3、整合SpringBoot2.3.1、添加依赖2.3.2、修改...
  • RabbitMQ 延迟队列详解

    千次阅读 2022-05-29 16:16:32
    一、延迟队列概念 延迟队列存储的对象时对应的延迟消息,所谓“延迟消息”是指当消息呗发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。 二、延迟队列使用场景 1、订单...
  • RabbitMQ简单使用看这里,包括使用这个方案之前mq的配置环境 RabbitMQ 死信 Dead Letter exchanges (死信路由) 》一个消息在满足如下条件下,会进死信路由,注意是路由不是队列,一个路由可以对应多个队列...
  • RabbitMQ 延迟队列

    千次阅读 2022-02-17 22:47:09
    业务需求 发送消息可能出现失败的情况,此时需要对消息进行重新发送,重新发送需要...而在 RabbitMQ 3.6.x 开始后,官方提供了延迟队列的插件 rabbitmq-delayed-message-exchange 。 DLX + TTL 在实现上,消息过
  • 前言篇:为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,其实对于大部分的功能是够用了的,但是以前的项目...
  • rabbitMq延迟队列实现

    2021-05-18 09:05:42
    我们要实现延迟消息队列效果,在rabbtimq中可以通过TTL+死信的方式,把过期消息转移到死信exchange中,然后再死信exchange绑定的队列中去消费完成后期的业务逻辑。 但是这里有一个前提就是,我们TTL队列中的过期...
  • 死信队列、延迟队列
  • MQ延迟队列插件安装 linux下mq安装与下载 生产者 public function send_msg($id){ $name = 'dead-x-tp6';//交换机名 $rounting_key = 'dead-x-key_tp6'; // 交换机路由key $queue = 'dead-x-queue_tp6'; //...
  • 1.使用死信队列方式 原理:设置消息在发送后一段时间内没被消费则会被推入死信队列进行消费。 项目依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns=...
  • 什么是延迟队列 延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。 场景一:在订单系统中,一个...
  • 延迟队列,队列内部是有序的,最重要的特性就体现在它的延迟属性上。 延迟队列中的元素是希望在指定时间到了以后或之前取出和处理。简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列属于...
  • RabbitMQ如何实现延迟队列

    千次阅读 2022-04-19 22:05:31
    rabbitmq实现延迟队列的方法
  • 消息中间件RabbitMQ(七)——两种方式实现延迟队列
  • rabbitmq的延迟队列

    千次阅读 2018-12-28 19:59:43
    初探RabbitMQ消息队列中介绍了RabbitMQ的简单用法,顺带提及了下延迟队列的作用。所谓延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。 延迟队列...
  • RabbitMQ实现延迟队列

    千次阅读 2022-03-08 18:21:11
    AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列: 1、Time To Live(TTL) RabbitMQ可以针对Queue设置x-...
  • RabbitMQ 延迟队列-非常非常实用

    万次阅读 热门讨论 2021-03-19 11:57:40
    RabbitMQ 延迟队列-非常非常实用RabbitMQ 延迟队列-非常非常实用一、使用场景二、消息延迟推送的实现三、项目具体实现 RabbitMQ 延迟队列-非常非常实用 一、使用场景 ​ 目前常见的应用软件都有消息的延迟推送的...
  • Rabbit MQ 延迟队列

    2018-06-08 20:45:00
    RabbitMQ本身不支持延迟队列,但是我们可以使用死信队列(DLX)和设置有效时间(TTL)两个特性来实现延迟队列。 先新建队列order_query并设置消息有效时间是10分钟,然后绑定一个死信队列order_dead_query,消费者...
  • 一、延迟队列 延时队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。 二、...
  • 接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type 类型标记的交换机投递到目标队列中...
  • 该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...
  • spring整和rabbitmq实现延迟队列

    千次阅读 2021-11-04 19:25:10
    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。 比如下订金后超过一定时间未付尾款,订单取消,库存回滚这样的情况,就需要用到延迟队列。 在rabbitmq中我们可以使用TTL(消息过期...
  • 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 22,362
精华内容 8,944
关键字:

mq延迟队列