-
2019-10-31 09:47:43
java实现rabbitMQ延时队列
一、在队列上设置TTL
1、exchange_delay_begin:缓冲队列exchange交换器,用于将消息转发至缓存消息队列 queue_delay_begin 。
2、queue_delay_begin:缓冲消息队列,等待消息过期。
3、exchange_delay_done:死信(dead-letter)队列exchange交换器,用于将队列 queue_delay_begin 转发到死信队列。
4、queue_delay_done:死信消息队列,消费者能够真正消费信息。queue_delay_begin的参数:
x-dead-letter-exchange: exchange_delay_done
x-dead-letter-routing-key: queue_delay_done
x-max-length: 500 (队列长度,超过直接进入死信队列)
x-message-ttl: 30000 (超时时间,超过进入死信队列)更多相关内容 -
RabbitMQ延时队列原理讲解
2020-05-10 20:27:43RabbitMQ延时消息队列 延时队列介绍 延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成...RabbitMQ延时消息队列
延时队列介绍
延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景
网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会系统中的业务失败之后,需要重试这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。
Rabbitmq实现延时队列一般而言有两种形式:第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]
第二种方式:利用rabbitmq中的插件x-delay-message
TTL DLX实现延时队列
TTL DLX介绍
TTL
RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
Dead Letter Exchanges(DLX)RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
-
SpringBoot集成RabbitMQ延时队列,自定义延时时间Demo
2020-09-16 13:31:43该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置... -
RabbitMQ延迟队列及消息延迟推送实现详解
2020-08-25 07:27:45主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 -
springboot+rabbitmq实现延时队列
2018-08-29 11:13:48springboot+rabbitmq实现延时队列,包括消息发送和消费确认,消费者端使用策略模式处理业务 -
RabbitMQ延时队列实现(PHP)
2022-03-23 17:51:07linux 下安装RabbitMQ 转载: ...本教程为 windows 示例: ...自测: 服务 + 延迟队列插件 (注意版本) RabbitMq Server 3.7.4 rabbitmq_delayed_message_exchange-3.8.0.ez 插件地址: https://github.rc184linux 下安装RabbitMQ
转载: https://blog.csdn.net/qq_39135287/article/details/95725385本教程为 windows 示例:
转载: https://www.jianshu.com/p/a6f21317722a自测:
服务 + 延迟队列插件 (注意版本)
RabbitMq Server 3.7.4
rabbitmq_delayed_message_exchange-3.8.0.ez [适用于3.7 ~ 3.8]下载完毕: 放在plugins目录
常用命令:
进入到RabbitMq Server 3.7.4 \ sbin 目录下rabbitmq-service start // 启动 rabbitmq-service stop // 停止 rabbitmq-plugins enable rabbitmq_management // 启用管理界面 (web管理界面) rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启用延迟队列插件
安装完插件, 重新启动rabbitmq服务, 出现如下图所示, 即插件安装成功
PHP项目代码
protected $connection; //连接 protected $channel; //频道 protected $config = [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', ]; // 过期时间 const TIMEOUT_5_S = 5; // 5s private $exchange_logs = "logs"; private $exchange_direct = "direct"; private $exchange_delayed = "delayed"; private $queue_delayed = "delayedQueue"; CONST EXCHANGETYPE_FANOUT = "fanout"; CONST EXCHANGETYPE_DIRECT = "direct"; CONST EXCHANGETYPE_DELAYED = "x-delayed-message"; // 生命连接 protected function _initialize() { $this->connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['pass'], $this->config['vhost']); $this->channel = $this->connection->channel(); // 声明Exchange $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false,false,false,new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT])); $this->channel->queue_declare($this->queue_delayed, false, true, false, false); $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed,$this->queue_delayed); } // 创建消息 , 发送消息第三行 会调用此方法 public function createMessageDelay($msg,$time) { $delayConfig = [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $time * 1000]) ]; $msg = new AMQPMessage($msg,$delayConfig); return $msg; } /** * delay send message 发送消息 */ // public function sendDelay($msg,$time = self::TIMEOUT_10_S) { public function sendDelay() { $msg = '555555'; $time = 10; $msg = $this->createMessageDelay($msg,$time); $this->channel->basic_publish($msg,$this->exchange_delayed,$this->queue_delayed); $this->channel->close(); $this->connection->close(); } /** * delay consum 监听 */ public function consumDelay(){ $callback = function($msg){ echo ' [x] ', $msg->body, "\n"; $this->channel->basic_ack($msg->delivery_info['delivery_tag'],false); }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); }
测试类
1: 接收监听
2: 执行发送
ps:
mq修改完代码, 接收接听需要 重新监听
ps 有序发送
如下图, rabbitmq 发送数据是有序的, 先执行test, 在执行test2, 发现接收的顺序为: aaa, bbb
既aaa10分钟后被消费了, bbb才能被消费, 类似队列, 先进先出, 场景下单30分钟后取消
ps 无序发送
如下图, rabbitmq 发送数据是有序的, 先执行test, 在执行test2, 发现接收的顺序为: bbb, aaa
既aaa 5分钟后被消费了, bbb 被消费, 符合自定义延迟时间场景.
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种:
Direct Exchange:见文知意,直连交换机意思是此交换机需要绑定一个队列,要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的,点对点的发送。
Fanout Exchange:这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。
Topic Exchange:直接翻译的话叫做主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 “#”。需要注意的是通配符前面必须要加上".“符号。
*符号:有且只匹配一个词。比如 a.*可以匹配到"a.b”、“a.c”,但是匹配不了"a.b.c"。
#符号:匹配一个或多个词。比如"rabbit.#“既可以匹配到"rabbit.a.b”、“rabbit.a”,也可以匹配到"rabbit.a.b.c"。
Headers Exchange:这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。 -
rabbitmq延时队列和四种交换机模式下队列的简单实现
2018-08-06 22:57:39rabbitmq延时队列和四种交换机模式下队列的简单实现,需要自己配置一下属性文件。 -
springboot整合RabbitMQ实现延时队列的两种方式 教程及源码
2019-06-29 15:19:14springboot整合RabbitMQ实现延时队列的两种方式 教程及源码。参考博客:https://blog.csdn.net/qq_29914837/article/details/94070677 -
RabbitMQ延时队列
2022-01-23 12:25:50延时队列 1.1 什么是延时队列 在开发中,往往会遇到一些关于延时任务的需求。例如 生成订单30分钟未支付,则自动取消 生成订单60秒后,给用户发短信 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价...延时队列
1.1 什么是延时队列
在开发中,往往会遇到一些关于延时任务的需求。例如
- 生成订单30分钟未支付,则自动取消
- 生成订单60秒后,给用户发短信
- 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
和定时任务区别
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别- 定时任务有明确的触发时间,延时任务没有
- 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期。(可以将延时任务看作是基于事件驱动的)
- 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
1.2 延时队列使用场景
- 订单在十分钟之内未支付则自动取消。
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 账单在一周内未支付,则自动结算。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
1.3 常见的延时任务的解决方案
1.3.1 数据库轮询
该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作
优点:
简单易行,支持集群操作缺点:
- 对服务器内存消耗大
- 存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
- 假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大
1.3.2 JDK的延迟队列
该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
优点
效率高,任务触发时间延迟低缺点
- 服务器重启后,数据全部消失,怕宕机
- 集群扩展相当麻烦
- 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
- 代码复杂度较高
1.3.3 netty时间轮算法
时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)
优点
效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。缺点
- 服务器重启后,数据全部消失,怕宕机
- 集群扩展相当麻烦
- 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
1.3.4 使用消息队列
我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列- RabbitMQ可以针对Queue和Message设置
x-message-tt
,来控制消息的生存时间,如果超时,则消息变为dead letter
- RabbitMQ的Queue可以配置
x-dead-letter-exchange
和x-dead-letter-routing-key(可选)
两个参数,用来控制队列内出现的dead letter
,dead letter
到达死信队列交换机后则按照这两个参数重新路由。、
优点
高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。缺点
本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高rabbitMQ中的延时队列
RabbitMQ中没有对消息延迟进行实现,但是我们可以通过TTL以及死信路由来实现消息延迟。
TTL(消息过期时间)
在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性—— TTL(Time ToLive) 。
TTL 是RabbitMQ中一个消息或者队列的属性,表明 一条消息或者该队列中的所有消息的最大存活时间 ,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
配置队列TTL:
一种是在创建队列的时候设置队列的
“x-message-ttl”
属性@Bean public Queue taxiOverQueue() { Map<String, Object> args = new HashMap<>(2); args.put("x-message-ttl", 30000); return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build(); }
这样所有被投递到该队列的消息都最多不会存活超过30s,但是消息会到哪里呢,如果没有任何处理,消息会被丢弃,如果配置有死信队列,超时的消息会被投递到死信队列
另一种方式便是针对每条消息设置TTL,代码如下:AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
实现延时队列
我们现在知道了死信队列、TTL,只要把二者相融合,就能实现延时队列了。
延时队列
,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。配置文件
server: port: 8080 spring: rabbitmq: host: 192.168.64.133 port: 5672 username: root password: root ###开启消息确认机制 confirms virtual-host: / publisher-returns: true #采用手动应答 listener: simple: acknowledge-mode: manual
配置类
交换器配置:
先声明交换机、队列以及他们的绑定关系:
@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queuea"; public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queueb"; public static final String DELAY_QUEUE_A_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey"; public static final String DELAY_QUEUE_B_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey"; public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey"; public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queuea"; public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queueb"; // 声明延时Exchange @Bean("delayExchange") public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明延时队列A 延时10s // 并绑定到对应的死信交换机 @Bean("delayQueueA") public Queue delayQueueA() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY); // x-message-ttl 声明队列的TTL args.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build(); } // 声明延时队列B 延时 60s // 并绑定到对应的死信交换机 @Bean("delayQueueB") public Queue delayQueueB() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY); // x-message-ttl 声明队列的TTL args.put("x-message-ttl", 1888 * 60); return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build(); } // 声明死信队列A 用于接收延时10s处理的消息 @Bean("deadLetterQueueA") public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUE_A_NAME); } // 声明死信队列B 用于接收延时60s处理的消息 @Bean("deadLetterQueueB") public Queue deadLetterQueueB() { return new Queue(DEAD_LETTER_QUEUE_B_NAME); } // 声明延时队列A绑定关系 @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_A_ROUTING_KEY); } // 声明业务队列B绑定关系 @Bean public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_B_ROUTING_KEY); } // 声明死信队列A绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY); } }
枚举类
定义相关枚举属性
public enum DelayTypeEnum { DELAY_10s(10), DELAY_60s(60); private int duration = 0; DelayTypeEnum(int duration) { this.duration = duration; } public int getDuration() { return duration; } public static DelayTypeEnum getType(int duration) { return Stream.of(DelayTypeEnum.values()).filter((type) -> type.duration == duration).findFirst().get(); } }
消费者
接下来,创建两个消费者,分别对两个死信队列的消息进行消费:
@Component public class DeadLetterQueueConsumer { private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueConsumer.class); @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_A_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); logger.info("当前时间:{},死信队列A收到消息:{}", System.currentTimeMillis(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_B_NAME) public void receiveB(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); logger.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
生产者
@Component public class DelayMessageSender { private static final Logger logger = LoggerFactory.getLogger(DelayMessageSender.class); @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg, DelayTypeEnum type) { logger.info("发送延时消息,message:{}, duration:{},当前时间:", msg, type.getDuration(), System.currentTimeMillis()); switch (type) { case DELAY_10s: rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_A_ROUTING_KEY, msg); break; case DELAY_60s: rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_B_ROUTING_KEY, msg); break; default: rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_A_ROUTING_KEY, msg); break; } } }
请求接口
接下来,我们暴露一个web接口来生产消息
RestController @RequestMapping("/rabbitmq") public class RabbitmqController { @Autowired private DelayMessageSender msgProducer; /** * 发送测试数据 * * @param type 交换器类型 * @param message * @return */ @RequestMapping("/send") public String send(String message, int type) { msgProducer.sendMsg(message, DelayTypeEnum.getType(type)); return "OK"; } }
RabbitMQ控制台
打开rabbitMQ的管理后台,可以看到我们刚才创建的交换机和队列信息:
测试
接下来,我们来发送几条消息
2020-09-22 16:55:40.977 INFO 452 --- [nio-8080-exec-1] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:xxxxxxx, duration:60,当前时间: 2020-09-22 16:55:45.716 INFO 452 --- [nio-8080-exec-2] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:xxxxxxx, duration:10,当前时间: 2020-09-22 16:55:53.164 INFO 452 --- [nio-8080-exec-3] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:延时10s, duration:10,当前时间: 2020-09-22 16:55:55.746 INFO 452 --- [ntContainer#1-1] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:1600764955745,死信队列A收到消息:xxxxxxx 2020-09-22 16:56:01.333 INFO 452 --- [nio-8080-exec-4] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:延时60s, duration:60,当前时间: 2020-09-22 16:56:03.170 INFO 452 --- [ntContainer#1-2] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:1600764963170,死信队列A收到消息:延时10s 2020-09-22 16:57:34.268 INFO 452 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:Tue Sep 22 16:57:34 CST 2020,死信队列B收到消息:xxxxxxx 2020-09-22 16:57:54.619 INFO 452 --- [ntContainer#0-2] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:Tue Sep 22 16:57:54 CST 2020,死信队列B收到消息:延时60s
第一条消息在6s后变成了死信消息,然后被消费者消费掉,第二条消息在60s之后变成了死信消息,然后被消费掉,这样,一个还算ok的延时队列就打造完成了。
不过如果这样使用的话每增加一个新的时间需求,就要新增一个队列,所以还需要优化一下。
RabbitMQ延时队列优化
将TTL设置在消息属性里
配置类
增加一个延时队列,用于接收设置为任意延时时长的消息,增加一个相应的死信队列和routingkey:
@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec"; public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey"; public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec"; // 声明延时Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明延时队列C 不设置TTL // 并绑定到对应的死信交换机 @Bean("delayQueueC") public Queue delayQueueC(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build(); } // 声明死信队列C 用于接收延时任意时长处理的消息 @Bean("deadLetterQueueC") public Queue deadLetterQueueC(){ return new Queue(DEAD_LETTER_QUEUEC_NAME); } // 声明延时列C绑定关系 @Bean public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY); } // 声明死信队列C绑定关系 @Bean public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY); } }
消费者
@Component public class DeadLetterQueueConsumer { private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueConsumer.class); DateFormat dateFormat = DateFormat.getDateTimeInstance(); @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEC_NAME) public void receiveC(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); Date sendDate = message.getMessageProperties().getTimestamp(); //延迟的秒数 long durationSec = (System.currentTimeMillis() - sendDate.getTime()) / 1000; logger.info("接收到延时消息,消息:{},延时秒数:{},当前时间:{}", msg, durationSec, dateFormat.format(new Date())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
生产者
使用spring的setExpiration方法来设置消息的过期时间
@Component public class DelayMessageSender { private static final Logger logger = LoggerFactory.getLogger(DelayMessageSender.class); DateFormat dateFormat = DateFormat.getDateTimeInstance(); @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg, int delayTime) { logger.info("发送延时消息,message:{}, duration:{},当前时间:{}", msg, delayTime, dateFormat.format(new Date())); rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEC_ROUTING_KEY, msg, x -> { MessageProperties messageProperties = x.getMessageProperties(); messageProperties.setExpiration(String.valueOf(delayTime)); messageProperties.setTimestamp(new Date()); return x; }); } }
请求接口
这个接口可以传递延迟任意时间消息
@RestController @RequestMapping("/rabbitmq") public class RabbitmqController { @Autowired private DelayMessageSender msgProducer; /** * 发送测试数据 * * @param delayTime 交换器类型 * @param message * @return */ @RequestMapping("/send") public String send(String message, int delayTime) { msgProducer.sendMsg(message, delayTime); return "OK"; } }
测试
正常测试
请求地址http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000 http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000
测试数据
2020-09-23 10:17:41.397 INFO 17924 --- [nio-8080-exec-4] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test30s, duration:30000,当前时间:2020-9-23 10:17:41 2020-09-23 10:17:47.317 INFO 17924 --- [nio-8080-exec-7] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 10:17:47 2020-09-23 10:18:11.402 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test30s,延时秒数:30,当前时间:2020-9-23 10:18:11 2020-09-23 10:18:47.321 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 10:18:47
非正常测试
请求地址:
http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000 http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000
测试数据
2020-09-23 10:19:30.793 INFO 17924 --- [io-8080-exec-10] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 10:19:30 2020-09-23 10:19:36.694 INFO 17924 --- [nio-8080-exec-1] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test30s, duration:30000,当前时间:2020-9-23 10:19:36 2020-09-23 10:20:30.797 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 10:20:30 2020-09-23 10:20:30.797 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test30s,延时秒数:54,当前时间:2020-9-23 10:20:30
我们先发了一个延时时长为60s的消息,然后发了一个延时时长为30s的消息,结果显示,第二个消息会在等第一个消息成为死信后才会“死亡“。
利用RabbitMQ插件实现延迟队列
①下载插件
为了解决上面的问题,我们安装一个mq的插件。https://www.rabbitmq.com/community-plugins.html,下载rabbitmq_delayed_message_exchange
插件,然后解压放置到RabbitMQ的插件目录。
或者直接wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
②安装插件:
直接解压就行③启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins list 命令查看已安装插件
配置类
@Configuration public class RabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
消费者
@Component public class DeadLetterQueueConsumer { private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueConsumer.class); DateFormat dateFormat = DateFormat.getDateTimeInstance(); @RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE_NAME) public void receiveDelay(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); Date sendDate = message.getMessageProperties().getTimestamp(); //延迟的秒数 long durationSec = (System.currentTimeMillis() - sendDate.getTime()) / 1000; logger.info("接收到延时消息,消息:{},延时秒数:{},当前时间:{}", msg, durationSec, dateFormat.format(new Date())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
生产者
使用spring的setDelay方法来设置消息的过期时间
@Component public class DelayMessageSender { private static final Logger logger = LoggerFactory.getLogger(DelayMessageSender.class); DateFormat dateFormat = DateFormat.getDateTimeInstance(); @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg, int delayTime) { logger.info("发送延时消息,message:{}, duration:{},当前时间:{}", msg, delayTime, dateFormat.format(new Date())); rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, RabbitMQConfig.DELAYED_ROUTING_KEY, msg, x -> { MessageProperties messageProperties = x.getMessageProperties(); messageProperties.setDelay(delayTime); messageProperties.setTimestamp(new Date()); return x; }); } }
请求接口
这个接口可以传递延迟任意时间消息
@RestController @RequestMapping("/rabbitmq") public class RabbitmqController { @Autowired private DelayMessageSender msgProducer; /** * 发送测试数据 * * @param delayTime 交换器类型 * @param message * @return */ @RequestMapping("/send") public String send(String message, int delayTime) { msgProducer.sendMsg(message, delayTime); return "OK"; } }
测试
正常测试:
访问:http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000 http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000
结果:
2020-09-23 11:06:15.676 INFO 11072 --- [nio-8080-exec-5] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 11:06:15 2020-09-23 11:06:17.308 INFO 11072 --- [nio-8080-exec-6] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 11:06:17 2020-09-23 11:07:15.680 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 11:07:15 2020-09-23 11:07:17.312 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 11:07:17
非正常测试:
访问:http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000 http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000
结果:
2020-09-23 11:09:45.823 INFO 11072 --- [nio-8080-exec-4] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 11:09:45 2020-09-23 11:09:47.402 INFO 11072 --- [nio-8080-exec-5] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test30s, duration:30000,当前时间:2020-9-23 11:09:47 2020-09-23 11:10:17.408 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test30s,延时秒数:30,当前时间:2020-9-23 11:10:17 2020-09-23 11:10:45.826 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 11:10:45
可以看到第二个消息被先消费掉了,符合预期。
-
SpringBoot使用RabbitMQ延时队列(小白必备)
2020-08-25 06:08:33主要介绍了SpringBoot使用RabbitMQ延时队列(小白必备),详细的介绍延迟队列的使用场景及其如何使用,需要的小伙伴可以一起来了解一下 -
RabbitMQ延时队列应用
2021-04-25 10:41:43RabbitMQ 延时队列的介绍和应用 -
RabbitMQ延时队列插件实现
2022-03-30 15:34:34死信队列+延时队列存在的问题 由于传统的延时队列+死信队列实现自动取消订单的延时任务一...RabbitMQ的延时队列插件采用的是过期时间到了就消费的规则,不像其他队列是根据先进先出的规则进行消费,使用延时队列插件可以 -
rabbitMq+erlang+延时队列插件完整安装包.7z
2021-04-07 15:54:05rabbitMq+erlang+延时队列插件完整安装包(正确的版本对应) -
RabbitMQ死信队列原理并实现延迟队列
2020-05-01 19:23:06死信队列 死信交换机(Dead-Letter-Exchange),当消息在一个队列中变成死信之后,它能被发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列 消息变成死信一般是由于下面三种情况: 消息被... -
rabbitMQ 延时队列
2020-09-06 19:38:13rabbitmq要实现延时队列,主要利用消息的过期时间TTL,和死信机制来做,简单来说,我们可以将需要延时发送的消息,设置过期时间,然后把消息发送到某个队列,并且在这个队列上绑定一个死信交换机,这个死信交换机和... -
RabbitMq延时队列
2022-04-03 19:11:261. 场景:“订单下单成功后,15分钟未支付自动取消” 1.传统处理超时订单 采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性能会有很... 2.rabbitMQ延时队列方案 一台普通的... -
rabbitmq 延时队列踩坑记
2021-10-26 13:59:13提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 ...往往解决这些场景的技术手段无外乎于两种,1 定时任务调度 2 延时队列。鉴于定时任务实时性不好控制,往往使用延时队列来实现处理。 JDK ... -
RabbitMQ延时队列解决分布式事务问题
2022-02-14 15:42:33一、RabbitMQ延时队列 1、场景 比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。 常用解决方案: spring的 schedule定时任务轮询数据库 缺点: 消耗系统内存、增加了数据库的... -
RabbitMQ延时队列的使用
2020-11-05 09:29:06延时队列概述 通过给一个普通的队列设置一些参数,死信交换机,死信路由,以及队列过期时间,注意,这个队列不能让任何消费者监听,然后我们将消息统一发往这个队列,当这个队列中的消息过期后,就会根据我们指定的... -
RabbitMQ 的延时队列和镜像队列原理与实战
2019-05-05 15:53:43在阿里云栖开发者沙龙PHP技术专场上,掌阅资深后端工程师、掘金小测《Redis深度历险》作者钱文品为大家介绍了RabbitMQ的延时队列和镜像队列的原理与实践,重点比较了RabbitMQ提供的消息可靠与不可靠模式,同时介绍了... -
SpringBoot使用RabbitMQ延时队列
2019-01-04 15:03:35延时队列 延时队列的使用场景: 1.订单业务:在电商中,用户下单后30分钟后未付款则取消订单。 2.短信通知:用户下单并付款后,1分钟后发短信给用户。 延时队列实现思路 AMQP协议和RabbitMQ队列本身没有直接支持延迟... -
RabbitMQ 延时队列
2020-09-27 15:00:24延时队列,顾名思义是带有延时功能的消息队列,列举几个使用场景: 定时发公告 用户下单30分钟后未付款自动关闭订单 用户下单后延时短信提醒 延时关闭空闲客户端连接 物联网系统经常会遇到向终端下发命令,如果... -
java实现rabbitMQ延时队列详解以及spring-rabbit整合教程
2018-07-24 03:05:00RabbitMQ本身没有直接支持延迟队列功能,但是我们可以根据其特性Per-Queue Message TTL和 Dead Letter Exchanges实现延时队列。也可以通过改特性设置消息的优先级。 1.Per-Queue Message TTLRabbitMQ可以针对消息... -
docker安装rabbitmq延时队列插件
2020-11-24 10:57:03docker安装rabbitMQ延时队列插件(delayed_message_exchange) 1. 查找Docker容器中的RabbitMQ镜像 docker ps -a [root@linux ~]# docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS -
RabbitMQ延时队列,消息重复、堆积、丢失、顺序
2021-05-17 16:30:11rabbitmq延时队列 可以设置队列延时,也可以设置消息延时 生产者->延时队列交换机->延时队列->死信队列->死信队列交换机->消费者 如果是不同梯度的延时并且梯度很少,例如 5s, 10s, 30s只有3个,... -
rabbitmq延时队列的问题
2019-09-11 16:50:16最近在用rabbitmq做了一个延时队列,然后在设置延时时,由于情况没有考虑全面,导致延时设置成了负数,那么当我们的延时时间设置为负数时,会怎么样呢? 我们来看一段源码, public void setDelay(Integer ... -
C#实现rabbitmq 延迟队列功能实例代码
2020-08-30 18:28:08本篇文章主要介绍了C#实现rabbitmq 延迟队列功能实例代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。