精华内容
下载资源
问答
  • rabbitMQ延迟队列

    2019-03-15 15:55:37
    rabbitmq 延迟队列
  • RabbitMQ延迟队列

    2020-11-03 15:32:12
    原文: 【RabbitMQ】一文带你搞定RabbitMQ延迟队列 【RabbitMQ】一文带你搞定RabbitMQ死信队列
    展开全文
  • RabbitMQ 延迟队列

    2021-02-05 14:01:32
    RabbitMQ 延迟队列什么是延迟队列Time To Live(TTL)Dead Letter Exchanges(DLX)延迟队列实现方式一(推荐)原理代码实现延迟队列实现方式二(不推荐)原理缺点 什么是延迟队列 延迟队列存储的对象肯定是对应的延时消息...

    什么是延迟队列

    延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

    本人使用场景:
    项目对接银行支付, 由于银行回调会由于某种原因如网络波动等情况导致没有接收到回调, 需要自己去查询该订单是否已经支付完成. 如在三分钟后查询支付详情.

    Time To Live(TTL)

    消息的TTL就是消息的存活时间

    RabbitMQ 可以针对 Queue 设置过期时间或者针对 message 设置过期时间, 如果超时(两者都设置以最先到期时间为准),消息则变为死信

    • 通过队列属性设置, 队列中所有消息都有相同的过期时间(推荐)
    • 通过对消息单独进行设置, 每条消息的 TTL 可以不同(不推荐)

    Dead Letter Exchanges(DLX)

    • 一个消息在满足如下条件下,会进入死信路由,一个路由可以对应很多队列
      • 一个消息被消费者拒收了, 并且reject方法的参数里requeue是false, 也就是说不会被在此放在队列里,被其他消费者使用
      • 上面的消息的TTL到了,消息过期
      • 队列的长度限制满了, 排在前面的消息会被丢弃或者扔到死信路由上
    • 其实就是一个普通的路由, 和创建其他的Exchanges一样, 只是在某一个设置Dead Letter Exchanges的队列中有消息过期了,就会自动触发消息的转发,发送到Dead Letter Exchanges中去
    • 我们既可以控制消息在一段时间后变成死信, 又可以控制变成死信的消息被路由到某个指定的交换机,结合二者,就可以实现一个延时队列

    延迟队列实现方式一(推荐)

    针对 Queue 设置过期时间

    在这里插入图片描述

    原理

    1. 生产者发送消息,使用的路由键是与死信队列绑定的路由键 bcm.qrcode.issued
    2. 交换机根据路由键将消息路由到死信队列
    3. 由于死信队列无人监听,所以在到达过期时间后继续转发到交换机, 转发的路由键是 bcm.pay.select
    4. 交换机在根据路由键转发的另一个监听队列中, 此时消息被消费.
    5. 如此实现了延迟队列, 消息在发送一段时间后才被消费者消费

    代码实现

    创建一个枚举类

    @Getter
    public enum QueueEnum {
     /**
         * 死信队列
         */
        BCM_SELECT_QUEUE_TTL("delay_exchange", "delay_queue_per_queue_ttl", "bcm.qrcode.issued"),
    
        /**
         * 延迟队列
         */
        BCM_SELECT_QUEUE("delay_exchange", "delay_process_queue", "bcm.pay.select");
    
        /**
         * 交换名称
         */
        private String exchange;
        /**
         * 队列名称
         */
        private String name;
        /**
         * 路由键
         */
        private String routeKey;
    
        QueueEnum(String exchange, String name, String routeKey) {
            this.exchange = exchange;
            this.name = name;
            this.routeKey = routeKey;
        }
    }
    

    配置类

    @Configuration
    public class MessageRabbitMqConfiguration {
    /**
         * 死信队列 必须无人监听
         * @return
         */
        @Bean
        public Queue delayQueuePerQueueTTL() {
            Map<String,Object> arguments = new HashMap<>();
            //交换机
            arguments.put("x-dead-letter-exchange",QueueEnum.BCM_SELECT_QUEUE_TTL.getExchange());
            //死信后转发到监听队列的路由键
            arguments.put("x-dead-letter-routing-key",QueueEnum.BCM_SELECT_QUEUE.getRouteKey());
            //过期时间
            arguments.put("x-message-ttl",3 * 60 * 1000);
    
            return new Queue(QueueEnum.BCM_SELECT_QUEUE_TTL.getName(), true, false, false,arguments);
        }
    
        /**
         * 延迟队列
         * @return
         */
        @Bean
        public Queue orderReleaseOrderQueue() {
            return new Queue(QueueEnum.BCM_SELECT_QUEUE.getName(), true, false, false);
        }
    
        /**
         * 交换机
         * @return
         */
        @Bean
        public Exchange orderEventExchange() {
            //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
            return new TopicExchange(QueueEnum.BCM_SELECT_QUEUE.getExchange(),true,false);
        }
    
        /**
         * 与死信队列路由绑定
         * @return
         */
        @Bean
        public Binding orderCreateOrderBinding() {
    
            return new Binding(QueueEnum.BCM_SELECT_QUEUE_TTL.getName(),
                    Binding.DestinationType.QUEUE,
                    QueueEnum.BCM_SELECT_QUEUE_TTL.getExchange(),
                    QueueEnum.BCM_SELECT_QUEUE_TTL.getRouteKey(),
                    null);
    
        }
    
        /**
         * 与延迟队列绑定
         * @return
         */
        @Bean
        public Binding orderReleaseOrderBinding() {
            return new Binding(QueueEnum.BCM_SELECT_QUEUE.getName(),
                    Binding.DestinationType.QUEUE,
                    QueueEnum.BCM_SELECT_QUEUE.getExchange(),
                    QueueEnum.BCM_SELECT_QUEUE.getRouteKey(),
                    null);
        }
    }
    

    发送消息

    	// 发送延迟队列 3 分钟 查询支付详情
       	BCMMessage message = new BCMMessage();
        message.setMerTranNo(orderNo);
        message.setTranType("PAY");
        message.setCount(1);
        //主要此处使用的是死信队列的路由键
        rabbitMqTemplate.sendMessage(message, QueueEnum.BCM_SELECT_QUEUE_TTL.getRouteKey(), QueueEnum.BCM_SELECT_QUEUE_TTL.getExchange());
    

    监听消息

    @Component
    @Slf4j
    @AllArgsConstructor
    @RabbitListener(queues = "delay_process_queue")
    public class BcmMsgConsumer {
    	@RabbitHandler
        public void bcmPaySelect(@Payload BCMMessage message) {
            log.info("BcmMsgConsumer-bcmPaySelect-message: {}", message.toString());
            // 业务处理代码
            ...
            
            //如果还是失败继续尝试,共三次
            if (message.getCount() < 3){
                message.setCount(message.getCount() + 1);
                rabbitMqTemplate.sendMessage(message, QueueEnum.BCM_SELECT_QUEUE_TTL.getRouteKey(), QueueEnum.BCM_SELECT_QUEUE_TTL.getExchange());
            }
        }
    }
    

    延迟队列实现方式二(不推荐)

    针对 message 设置过期时间 expiration : 180000

    在这里插入图片描述

    原理

    1. 生产者生产消息,并对消息设置单独的过期时间 expiration : 180000
    2. 在无人监听的死信队列中当消息到达过期时间后转发到交换机
    3. 交换机在根据路由键转发到监听队列, 此时消息被消费.
    4. 如此实现了延迟队列, 消息在发送一段时间后才被消费者消费

    缺点

    由于RabbitMQ采用的是惰性检查机制,也就是懒检查,如果第一个是5分钟过期, 第二个是3分钟过期,服务器拿出第一个消息发现是5分钟后过期,则5分钟后再来拿取,所以第二个设置的3分钟过期消息得在第一个消息过期之后才能过期

    展开全文
  • DLX + TTL 方式存在的时序问题对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有介绍过如何使用 RabbitMQ 死信队列(DLX) + TTL 的方式来模拟实现延迟队列,这也是通常的一种做法,可参见我的另一篇...

    319cfc0030c8af608c0e4b76c0c72e3e.png

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行。

    DLX + TTL 方式存在的时序问题

    对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有介绍过如何使用 RabbitMQ 死信队列(DLX) + TTL 的方式来模拟实现延迟队列,这也是通常的一种做法,可参见我的另一篇文章 利用 RabbitMQ 死信队列和 TTL 实现定时任务。

    今天我想说的是这种方式会存在一个时序问题,看下图:4b535f57705f3e9099f7e3570a6a1508.png

    左侧队列 queue1 分别两条消息 msg1、msg2 过期时间都为 1s,输出顺序为 msg1、msg2 是没问题的

    右侧队列 queue2 分别两条消息 msg1、msg2 注意问题来了,msg2 的消息过期时间为 1S 而 msg1 的消息过期为 2S,你可能想谁先过期就谁先消费呗,显然不是这样的,因为这是在同一个队列,必须前一个消费,第二个才能消费,所以就出现了时序问题

    如果你的消息过期时间是有规律的,例如,有的 1S、有的 2S,那么我们可以以时间为维度设计为两个队列,如下所示:92544ad7a70b18481c9bebf8aac78797.png

    上面我们将 1S 过期的消息拆分为队列 queue_1s,2S 过期的消息拆分为队列 queue_2s,事情得到进一步解决。如果此时消息的过期时间不确定或者消息过期时间维度过多,在消费端我们就要去监听多个消息队列且对于消息过期时间不确定的也是很难去设计的。

    针对消息无序的不妨看下以下解决方案。

    Delayed Message 插件

    这里要感谢 @神奇的包子,掘金(juejin.im/user/5bfc1b9d6fb9a049b347a9e2) 提出的 Delayed Message 插件方案。

    这里将使用的是一个 RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange,目前维护在 RabbitMQ 插件社区,我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。1b6ae02eddc9223ca72b21a18ed40bb2.png

    实现原理

    上面使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)

    这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

    插件安装

    根据你的 RabbitMQ 版本来安装相应插件版本,RabbitMQ community-plugins 上面有版本对应信息可参考。

    注意:需要 RabbitMQ 3.5.3 和更高版本。

    # 注意要下载至你的 RabbitMQ 服务器的 plugins 目录下,例如:/usr/local/rabbitmq/plugins

    wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

    # 解压
    unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

    # 解压之后得到如下文件
    rabbitmq_delayed_message_exchange-20171215-3.6.x.ez

    启用插件

    使用 rabbitmq-plugins enable 命令启用插件,启动成功会看到如下提示:

    $ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    The following plugins have been enabled:
    rabbitmq_delayed_message_exchange

    Applying plugin configuration to rabbit@xxxxxxxx... started 1 plugin.

    管理控制台声明 x-delayed-message 交换机

    在开始代码之前先打开 RabbitMQ 的管理 UI 界面,声明一个 x-delayed-message 类型的交换机,否则你会遇到下面的错误:

    Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type"

    这个问题困扰我了一会儿,详情可见 Github Issues rabbitmq-delayed-message-exchange/issues/19,正确操作如下图所示:b15448b4ae832ba159afb6aacede26c7.png

    Nodejs 代码实践

    上面准备工作完成了,开始我们的代码实践吧,官方没有提供 Nodejs 示例,只提供了 Java 示例,对于一个写过 Spring Boot 项目的 Nodeer 这不是问题(此处,兄得你有点飘了啊 smiley_38.png)其实如果有时间能多了解点些,你会发现还是有益的。

    构建生产者

    几个注意点:

    • 交换机类型一定要设置为 x-delayed-message
    • 设置 x-delayed-type 为 direct,当然也可以是 topic 等
    • 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递
    const amqp = require('amqplib');

    async function producer(msg, expiration) {
    try {
    const connection = await amqp.connect('amqp://localhost:5672');
    const exchange = 'my-delayed-exchange';
    const exchangeType = 'x-delayed-message'; // x-delayed-message 交换机的类型
    const routingKey = 'my-delayed-routingKey';

    const ch = await connection.createChannel();
    await ch.assertExchange(exchange, exchangeType, {
    durable: true,
    'x-delayed-type': 'direct'
    });

    console.log('producer msg:', msg);
    await ch.publish(exchange, routingKey, Buffer.from(msg), {
    headers: {
    'x-delay': expiration, // 一定要设置,否则无效
    }
    });

    ch.close();
    } catch(err) {
    console.log(err)
    }
    }

    producer('msg0 1S Expire', 1000) // 1S
    producer('msg1 30S Expire', 1000 * 30) // 30S
    producer('msg2 10S Expire', 1000 * 10) // 10S
    producer('msg3 5S Expire', 1000 * 5) // 5S

    构建消费端

    消费端改变不大,交换机声明处同生产者保持一样,设置交换机类型(x-delayed-message)和 x-delayed-type

    const amqp = require('amqplib');

    async function consumer() {
    const exchange = 'my-delayed-exchange';
    const exchangeType = 'x-delayed-message';
    const routingKey = 'my-delayed-routingKey';
    const queueName = 'my-delayed-queue';

    try {
    const connection = await amqp.connect('amqp://localhost:5672');
    const ch = await connection.createChannel();

    await ch.assertExchange(exchange, exchangeType, {
    durable: true,
    'x-delayed-type': 'direct'
    });
    await ch.assertQueue(queueName);
    await ch.bindQueue(queueName, exchange, routingKey);
    await ch.consume(queueName, msg => {
    console.log('consumer msg:', msg.content.toString());
    }, { noAck: true });
    } catch(err) {
    console.log('Consumer Error: ', err);
    }
    }

    consumer()

    以上示例源码地址:

    https://github.com/Q-Angelo/project-training/tree/master/rabbitmq/rabbitmq-delayed-message-node

    最后,让我们对以上程序做个测试,左侧窗口展示了生产端信息,右侧窗口展示了消费端信息,这次实现了同一个队列里不同过期时间的消息,可以按照我们预先设置的 TTL 时间顺序性消费,我们的目的达到了。7ea8260b58a798dacc441f70ba39a012.png

    局限性

    Delayed Message 插件实现 RabbitMQ 延迟队列这种方式也不完全是一个银弹,它将延迟消息存在于 Mnesia 表中,并且在当前节点上具有单个磁盘副本,它们将在节点重启之后幸存。

    目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,详情参见 #/issues/72 另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。

    插件的禁用要慎重,以下方式可以实现将插件禁用,但是注意如果此时还有延迟消息未消费,那么禁掉此插件后所有的未消费的延迟消息将丢失。

    rabbitmq-plugins disable rabbitmq_delayed_message_exchange

    如果你采用了 Delayed Message 插件这种方式来实现,对于消息可用性要求较高的,在发现消息之前可以先落入 DB 打标记,消费之后将消息标记为已消费,中间可以加入定时任务做检测,这可以进一步保证你的消息的可靠性。

    总结

    经过一番实践测试、学习之后发现,DLX + TTLDelayed Message 插件这两种 RabbitMQ 延迟消息解决方案都有一定的局限性。

    如果你的消息 TTL 是相同的,使用 DLX + TTL 的这种方式是没问题的,对于我来说目前还是优选。

    如果你的消息 TTL 过期值是可变的,可以尝试下使用 Delayed Message 插件,对于某些应用而言它可能很好用,对于那些可能会达到高容量延迟消息应用而言,则不是很好。

    关于 RabbitMQ 延迟队列,如果你有更多其它实现,欢迎关注公众号 “Nodejs技术栈” 在后台取得我的联系方式进行讨论,我很期待。

    Reference

    • github.com/rabbitmq/rabbitmq-delayed-message-exchange
    • www.rabbitmq.com/community-plugins.html

    作者简介:五月君,Nodejs Developer,慕课网认证作者,热爱技术、喜欢分享的 90 后青年,欢迎关注 Nodejs技术栈(id:NodejsRoadmap) 和 Github 开源项目 https://www.nodejs.red

    敬请关注「Nodejs技术栈」微信公众号,获取优质文章

    ▼往期精彩回顾▼Node.js 如何利用 Libuv 实现事件循环和异步Nodejs 进阶:解答 Cluster 模块的几个疑问多维度分析 Express、Koa 之间的区别你需要了解的有关 Node.js 的所有信息Node.js 服务 Docker 容器化应用实践一文零基础教你学会 Docker 入门到实践JavaScript 浮点数之迷:大数危机Node.js 是什么?我为什么选择它?分享 10 道 Nodejs 进程相关面试题不容错过的 Node.js 项目架构

    c1b634b9826e5077d93c5679bc69dda2.png

    展开全文
  • rabbitmq 延迟队列

    2019-06-26 22:51:37
    延迟队列 延迟队列的实现其实是通过前面提到的消息TTL机制以及死信队列进行实现的 具体可以参考之前的 rabbitmq TTL及死信队列,优先级队列 如果用户在 消息发送后,想延迟一段时间才执行相关的任务,可以使用延迟...

    延迟队列
    延迟队列的实现其实是通过前面提到的消息TTL机制以及死信队列进行实现的
    具体可以参考之前的 rabbitmq TTL及死信队列,优先级队列

    如果用户在 消息发送后,想延迟一段时间才执行相关的任务,可以使用延迟队列0
    及消息延迟一段时间才能被消费者消费, 延时处理
    该功能可以实现延迟执行, 也可以模拟为定时任务进行执行
    例如:

    1. 用户需要指令在指定时间执行,则可以使用延迟队列,到指定的时间被放入死信队列进行处理
    2. 下单后支付超时, 如果超时未支付,则放入死信队列中, 进行后续资源回滚等操作
    展开全文
  • RabbitMq延迟队列

    2021-03-12 15:29:54
    在JUC中我们知道有延迟队列,在MQ中的延迟队列主要是用来存储延迟消息的,“延迟消息”就是指消息被发送以后,并不想让消费者立即拿到消息,而是等待特定的时间之后,消费者才能拿到这个消息。这和...
  • 本文口味:鱼香肉丝 预计阅读:10分钟一、说明在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列。相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟...
  • rabbitMq 延迟队列

    2020-08-14 16:22:49
    延迟队列问题: 消息在延时队列中时间如果小于前面的队列等待时间也不会先执行,会按照队列的方式一个一个出队 import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.core....
  • 因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章Spring boot...
  • Rabbitmq延迟队列

    2018-11-21 23:12:17
    &lt;!-- ################ 订单通知服务消费者配置 ################ --&gt; &lt;!-- 创建rabbit ConnectionFactory,连接服务器 --&...${rabbitmq.host}" username="${r...
  • 3.处理业务,将通知URL和通知的数据放到队列中。通知队列是常驻的。4.如果通知商户成功/失败更新通知状态。如果通知失败,根据通知间隔重新把通知数据放到通知队列里。5*(2^n-1)秒后再次通知,n指回调次数,最大为5...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,596
精华内容 638
关键字:

rabbitmq延迟队列