精华内容
下载资源
问答
  • 主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • RabbitMQ死信队列原理并实现延迟队列

    千次阅读 2020-05-01 19:23:06
    死信队列 死信交换机(Dead-Letter-Exchange),当消息在一个队列中变成死信之后,它能被发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列 消息变成死信一般是由于下面三种情况: 消息被...

    码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071

    死信队列

    死信交换机(Dead-Letter-Exchange),当消息在一个队列中变成死信之后,它能被发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列

    消息变成死信一般是由于下面三种情况:

    • 消息被拒绝(Basic.Reject或Basic.Nack),并在调用时设置requeue参数为false
    • 消息过期
    • 队列达到最大长度

    通过在channel.queueDeclare方法中设置x-dead-letter-exchange参数来为这个队列添加DLX

    延迟队列

    定义2个队列,第一个队列通过设置消息或队列的有效期,使消息到期后变为死信进入到第二个队列中,而我们只消费第二个队列,则可实现延迟队列,下面是用php-amqplib实现的一分钟的延迟队列

    <?php
    require_once __DIR__.'/vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'why', 'why');
    $channel = $connection->channel();
    
    //测试死信的上游交换机
    $channel->exchange_declare('dlxBeforeExchange', 'direct',
        false, true, false, false, false, []);
    //死信的交换机
    $channel->exchange_declare('dlxExchange', 'direct',
        false, true, false, false, false, []);
    
    //测试死信队列的上游队列
    $dlx_table = new \PhpAmqpLib\Wire\AMQPTable();
    $dlx_table->set('x-dead-letter-exchange', 'dlxExchange' );
    $dlx_table->set('x-dead-letter-routing-key', 'dlxKey' );
    $channel->queue_declare('dlxBeforeQueue',
        false, true, false, false, false, $dlx_table);
    $channel->queue_bind('dlxBeforeQueue', 'dlxBeforeExchange', 'dlxBeforeKey');
    //死信队列
    $channel->queue_declare('dlxQueue',
        false, true, false, false, false);
    $channel->queue_bind('dlxQueue', 'dlxExchange', 'dlxKey');
    
    
    $head = array_merge(array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT), ['expiration' => '60000']);
    $msg = new AMQPMessage('hello why', $head);
    $res = $channel->basic_publish($msg, 'dlxBeforeExchange', 'dlxBeforeKey');

     

    展开全文
  • 因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列。功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live...
  • RabbitMQ延时队列原理讲解

    千次阅读 2020-05-10 20:27:43
    RabbitMQ延时消息队列 延时队列介绍 延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成...

    RabbitMQ延时消息队列

    延时队列介绍

    延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

    那么,为什么需要延迟消费呢?我们来看以下的场景

    网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会系统中的业务失败之后,需要重试这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。

    Rabbitmq实现延时队列一般而言有两种形式:第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]

    第二种方式:利用rabbitmq中的插件x-delay-message

    TTL DLX实现延时队列

    TTL DLX介绍

    TTL

    RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

    Dead Letter Exchanges(DLX)RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

    x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

     

    展开全文
  • RabbitMQ 延迟插件

    2020-06-19 09:27:46
    Rabbitmq 延迟插件 rabbitmq_delayed_message_exchange-20171215-3.6.x,注意适用于3.5.8 及其以后的版本
  • RabbitMQ 延迟队列-非常非常实用

    千次阅读 2021-03-19 11:57:40
    RabbitMQ 延迟队列-非常非常实用RabbitMQ 延迟队列-非常非常实用一、使用场景二、消息延迟推送的实现三、项目具体实现 RabbitMQ 延迟队列-非常非常实用 一、使用场景 ​ 目前常见的应用软件都有消息的延迟推送的...

    RabbitMQ 延迟队列-非常非常实用

    一、使用场景

    ​ 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

    ​ 淘宝七天自动确认收货,自动评价功能等。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能;相应的,自动评价也是类似的。

    ​ 12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟。

    ​ 当用户下订单后,将用户的订单的标识全部发送到延时队列中,30分钟后进去消费队列中被消费,消费时先检查该订单的状态,如果未支付则标识该订单失效,如果之前已经支付了,则可以通过逻辑代码判断来忽略掉收到的消息。

    ​ 有以下几种延时任务处理方式:

    Java自带的DelayQueue队列(底层代码DelayQueue,而Delayed继承了Comparable,所以,可以实现一个排序效果)

    ​ 这是java本身提供的一种延时队列,如果项目业务复杂性不高可以考虑这种方式。它是使用jvm内存来实现的,停机会丢失数据(需要自行持久化),扩展性不强。

    使用redis监听key的过期来实现

    ​ 当用户下订单后把订单信息设置为redis的key,30分钟失效,程序编写监听redis的key失效,然后处理订单(我也尝试过这种方式)。这种方式最大的弊端就是只能监听一台redis的key失效,集群下将无法实现,也有人监听集群下的每个redis节点的key,但我认为这样做很不合适。如果项目业务复杂性不高,redis单机部署,就可以考虑这种方式。

    ​ 而其他的解决方案,重点讲解延迟插件,以前的死信队列+TTL过期时间方式,请自行研究。

    二、消息延迟推送的实现

    在 RabbitMQ3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,网上很多文章都有过介绍。在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。

    本人使用的RabbitMQ是3.7.7版本,rabbitmq_delayed_message_exchange-3.8.0.ez这个插件放到RabbitMQ安装目录的plugins文件中 在RabbitMQ 安装目的sbin用cmd使用命令

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    开启插件后,启动RabbitMQ,访问登录后访问http://localhost:15672,用guest/guest登录后,在交换机exchanges的tab下,底部新增将看到如下图设置,则表示插件已启动,以后直接就可以使用了。

    image-20210319104533123

    插件从github下载速度较慢,本人提交了插件到码云了,同时项目地址也在下方。

    链接: rabbitmq_delayed_message_exchange-3.8.0.ez下载地址.

    延迟插件底层简单原理图:

    image-20210319112441507

    实现原理

    原始的DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机(具体代码请看下面config下的配置类交换机定义参数),它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)。

    这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

    项目结构:

    image-20210319105212369
    config:配置类,含各种交换机队列等设置。

    controller:不想解释。

    rabbitmq.listener:消费者,如果是微服务,请提出去。这里为了方便,则放入了同一个项目中。

    service:生产者,用于发送消息到mq。

    po:实体类,简单业务测试

    test:测试类,单独只写了一个批量延迟消息生产者,可以生产一定数量的消息。

    三、项目具体实现

    本项目是在springboot下,代码如下,首先引入amqp:

    		<dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    配置文件如下:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /
        connection-timeout: 15000
    
        #设置为true后 消费者在消息没有被路由到合适队列情况下,会被消费者这边的ReturnCallback监听,不会自动删除
        template:
          mandatory: true
        listener:
          simple:
            # 开启手动确认
            acknowledge-mode: manual
          #开启return 确认消息
        publisher-returns: true
          # ConfirmCallback开启发送到交换机Exchange触发回调
        publisher-confirm-type: correlated
    
    

    配置文件,下面有注释的地方,是额外确保消息可达防丢失,和成功失败触发相关的配置。

    image-20210319104926626

    具体的业务逻辑如上图所示(目前测试过主题和直连,广播模式经过测试也可以使用):

    配置config包下,用于配置交换机,队列和routingkey,对应上图中的名称:

    package com.woniuxy.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author: mayuhang  <br/>
     * Date: 2021/3/17:16:03  <br/>
     * Description:延迟队列配置类
     */
    @Configuration
    public class LazyExchangeConfig {
        public static final String LAZY_EXCHANGE="Ex.LazyExchange";
        public static final String LAZY_QUEUE="MQ.LazyQueue";
        public static final String LAZY_KEY="lazy.#";
        @Bean
        public CustomExchange lazyExchange() {
            //第一种设置方法  设置延迟交换机配置
            Map<String, Object> pros = new HashMap<>();
              //设置交换机支持延迟消息推送
            pros.put("x-delayed-type", "topic");
            CustomExchange exchange = new CustomExchange(LAZY_EXCHANGE, "x-delayed-message",false, true,pros);
    //        CustomExchange exchange = new CustomExchange(LAZY_EXCHANGE, "x-delayed-message",true, true);
    //        //第二种设置方法 这里有bug  如果直接就配置这个 就会报错 必须先用第一种 持久化交换机后就再使用下面这个配置才行
    //        exchange.setDelayed(true);
            return exchange;
        }
        @Bean
        public Queue lazyQueue(){
    //        HashMap<String, Object> pros = new HashMap<>();
    //        设置队列长度
    //        pros.put("x-max-length",1);
            //先不持久化
    //        return new Queue(LAZY_QUEUE,false,false,true,pros);
            return new Queue(LAZY_QUEUE,false);
        }
        @Bean
        public Binding lazyBinding(){
            return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY).noargs();
        }
    
    }
    

    对应业务service接口中生产者(发送方的)方法:

    package com.woniuxy.service.impl;
    
    import com.rabbitmq.client.Return;
    import com.rabbitmq.client.ReturnCallback;
    import com.woniuxy.po.Mail;
    import com.woniuxy.service.Publisher;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @author: mayuhang  <br/>
     * Date: 2021/3/19:11:03  <br/>
     * Description:这边实现了接口,对应接口代码忽略,消息发送端
     */
    @Service("publisher")
    public class PublisherImpl implements Publisher {
    	@Autowired
        RabbitTemplate rabbitTemplate;
    	
    	//消息发送到交换机时,触发回调!
    	final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    		/**
    		 * Description : 回调信息实现  <br/>
    		 * ChangeLog : 1. 创建 (2021/3/16 14:43 [mayuhang]);
    		 * @param correlationData 相关数据,感兴趣自己打印出来研究
    		 * @param ack 相应返回如果交换机收到即刻返回为true
    		 * @param cause 原因返回
    		 * @return void
    		 **/
    		@Override
    		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    			if(ack){
    				System.out.println("发送成功,例如:可以对redis中的某数据进行删除业务从操作....");
    			}else {
    				System.out.println("发送失败,例如:异常处理业务......");
    			}
    		}
    	};
    	//从交换机发送到队列,如果队列收到则会执行回调  这里有个问题,无论队列是否接收到,都会进入这个方法
    	private RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
    		@Override
    		public void returnedMessage(ReturnedMessage returned) {
    				System.out.println(returned.getMessage()+"消息被退回");
    				System.out.println("返回码:"+returned.getReplyCode());
    				System.out.println("消息退回原因:"+returned.getReplyText());
    				System.out.println("路由:"+returned.getRoutingKey());
    				System.out.println("获得消费者全局唯一标签:"+returned.getMessage().getMessageProperties().getHeader("spring_returned_message_correlation"));
    				System.out.println("队列收到消息,执行业务处理代码......");
    	};
    
    	/**
    	 * Description : 延迟队列发送消息到交换机,routingkey简单改为lazy.1000 1000为1s延迟  <br/>
    	 * ChangeLog : 1. 创建 (2021/3/17 17:30 [mayuhang]);
    	 * @param mail
    	 * @param routingkey
    	 * @return void
    	 **/
    	@Override
    	public void sendLazyTopicMail(Mail mail, String routingkey) {
    		SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    		System.out.println("msg发送时间:"+simpleDateFormat.format(new Date()));
    		//id+时间戳  全局唯一
    		CorrelationData correlationData = new CorrelationData("123456"+simpleDateFormat.format(new Date()));
    		//设置响应回调,如果找不到交换机,则回调ack为false
    		rabbitTemplate.setConfirmCallback(confirmCallback);
    		//设置返回相应,如果找不到对应的队列,则回调消息至returnsCallback
    		rabbitTemplate.setReturnsCallback(returnsCallback);
    		//消息发送
    		rabbitTemplate.convertAndSend("Ex.LazyExchange", routingkey, mail, new MessagePostProcessor() {
    			@Override
    			public Message postProcessMessage(Message message) throws AmqpException {
    				System.out.println(routingkey);
    				//简单处理routingkey,截取.后面数据为延迟的时间,实际业务中,肯定不会使用这个,可在实体里定义,或者根据业务定义
    				String[] split = routingkey.split("\\.");
    				int i = Integer.parseInt(split[1]);
    				System.out.println("延迟时间:"+ i);
    				//设置消息持久化
    				message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    				//这个底层就是setHeader("x-delay",i);是一样的
    				message.getMessageProperties().setDelay(i);
    				return message;
    			}
    		},correlationData);
    	}
    }
    

    对应的消费者,比较简单,具体项目业务再进行优化:

    package com.woniuxy.rabbitMQ.listener;
    
    import com.rabbitmq.client.Channel;
    import com.woniuxy.po.Mail;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author: mayuhang  <br/>
     * Date: 2021/3/17:17:08  <br/>
     * Description:监听 打印接受到的延迟后的消息 从channel中获取queue数据
     */
    @Component
    public class LazyTopicListener {
        @RabbitListener(queues = "MQ.LazyQueue")
        public void displayMail(Mail mail,Channel channel, Message message) throws Exception {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-HH-dd hh:mm:ss");
            System.out.println("消息接收的时间"+simpleDateFormat.format(new Date()));
            System.out.println("mail:"+mail.toString());
            //加入休眠,仅仅为了交换机排好序的数据,能够在队列中堆积,用于测试进入队列的数据能否排序,结果是不能,排序是在交换机之前
            //有一个
    //        Thread.sleep(1000);
            System.out.println("对应队列通道标签:"+message.getMessageProperties().getDeliveryTag());
            //手动ack确认,表示消费者已经成功消费,另外肯定还有自动确认,只不过容易出现消息未消费则丢失的情况
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
    

    对应controller代码和前端代码,请去码云中下载吧:
    链接: 对应码云下载地址.

    代码中,需要注意一点,在前端测试的时候,需要注意一点,如下图:

    image-20210319113831234

    后台打印消息如下:

    image-20210319114234719

    实现了延迟10000ms消费。

    同时其他的还能测验很多功能:比如插队!

    package com.woniuxy;
    
    import com.woniuxy.po.Mail;
    import com.woniuxy.service.Publisher;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RabbitmqApplicationTests {
        @Autowired
        Publisher publisher;
        @Test
        void contextLoads() {
            for (int i = 0; i <100 ; i++) {
                Mail mail = new Mail();
                mail.setMailId(i+"");
                publisher.sendLazyTopicMail(mail,"lazy."+i*1000);
            }
        }
    }
    
    

    在测试类中,生成100个任务(如果要高并发的话,可以通过countDownLatch同时发送100个任务):
    打印结果如下
    在这里插入图片描述
    生成的数据为id递增,延迟时间为i*1000 ms,如果中途希望添加一个临时的任务,则在页面中如下:
    在这里插入图片描述
    发送一个1ms的临时任务,插队立刻执行:
    在这里插入图片描述
    这样既可往之前的任务中,新增一段任务,变相解决了一个时序问题(在同一个队列,必须前一个消费,第二个才能消费)。
    本次研究的内容,还剩幂等性,防消息丢失等,更多业务功能,希望大家可以分享分享,感谢指导。

    这里发现了一个问题:
    RabbitTemplate.ReturnsCallback()这个接口,里面自己写的实现,无论交换机是否成功通过路由发送到队列,都会进入这个方法,而且就算成功了,打印的消息退回原因依旧是NO_ROUTE,有没有谁研究过这个?而且成功失败返回码都是312的!那么我怎么判断是消息入队列失败呢??不能判断要它何用……
    在这里插入图片描述
    下图是成功路由后返回的完整消息:
    在这里插入图片描述
    下图是失败路由返回的完整消息:
    在这里插入图片描述
    来来来,找不同……

    展开全文
  • rabbitmq延迟队列实现

    2021-04-15 16:54:10
    Rabbitmq并没有延迟队列 但是:死信队列+消息时间设置过期时间可以 达成我们想要的延迟队列效果 例如下单5分钟之内未支付就会取消订单,那么设置下单支付时间为5分钟后过期然后进入私信队列,一旦进入私信队列那么...

    延迟队列

    Rabbitmq并没有延迟队列

    但是:死信队列+消息时间设置过期时间可以 达成我们想要的延迟队列效果
    

    例如下单5分钟之内未支付就会取消订单,那么设置下单支付时间为5分钟后过期然后进入死信队列,一旦进入死信队列那么就自动取消订单

    代码
    消费者:

    @Component
    public class ttlproducer {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //  模拟订单
        public void makeTest(String a,String b){
            String ExchangeName = "ttl.direct.Exchange";
            String luYouKey = "ttl";
            String message = UUID.randomUUID().toString();
            System.out.println("订单生成----"+message);
            rabbitTemplate.convertAndSend(ExchangeName,luYouKey,message);
        }
    }
    

    创建死信队列
    这个死信队列 专门存放未支付订单的消息

    @Configuration
    public class dlxconsumer {
        @Bean
        public DirectExchange deadExchange(){
            return new DirectExchange("dlx.direct.Exchange");
        }
        @Bean
        public Queue abcdQueue(){
            return new Queue("si.xin.Queue");
        }
        @Bean
        public Binding abcdBinding(){
            return BindingBuilder.bind(abcdQueue()).to(deadExchange()).with("dead");
        }
    }
    

    创建订单过期时间

    @Configuration
    public class ttlconsumer {
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange("ttl.direct.Exchange",true,false);
        }
        @Bean
        public Queue abcQueue(){
            HashMap<String, Object> args = new HashMap<>();
            args.put("x-message-ttl",5000);//5秒
            args.put("x-dead-letter-exchange","dlx.direct.Exchange");
            args.put("x-dead-letter-routing-key","dead");
            return new Queue("ding.dan.Queue",true,false,false,args);
        }
        @Bean
        public Binding abcBinding(){
            return BindingBuilder.bind(abcQueue()).to(directExchange()).with("ttl");
        }
    }
    

    测试

    @SpringBootTest
    class SpringBootRabbitmqSixingduilieApplicationTests {
    
        @Autowired
        private ttlproducer ttlproducer;
        @Test
        void contextLoads() {
            ttlproducer.makeTest("下单","...");
        }
    }
    

    在这里插入图片描述
    我们可以去web界面看
    订单超时时间设置的是5秒过期时间
    看看我们过期的消息是否进入到死信队列

    在这里插入图片描述

    其实上面的死信队列已经起到了延迟队列的作用
    5秒内如果消费不了就会转投到死信队列里

    展开全文
  • 1、RabbitMQ延迟队列插件 1)、下载插件 下载地址:https://www.rabbitmq.com/community-plugins.html 选择相应的版本点击下载 下载的是.zip的安装包,下载完之后需要手动解压并上传到Linux服务器中 2)、...
  • 延迟队列 使用场景: 订单十分钟内未支付则自动取消:下单发送消息 TTL 十分钟,自动转入死信队列 DLX(消费取消订单) 用户发起退款,如果三天内没有得到处理则通知相关运营人员:发送退款请求消息 TTL 3天,还没...
  • RabbitMQ延迟队列(Python版)

    千次阅读 2019-02-13 18:20:39
    因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列。功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live...
  • RabbitMQ 延迟队列的实现

    千次阅读 2018-06-11 19:13:59
    1.为什么需要使用延迟队列?适用于什么场景? &amp;nbsp;&amp;nbsp;&amp;nbsp;场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。 &amp;nbsp;&amp;nbsp;&amp;...
  • 推荐书籍:《RabbitMQ实战指南 》朱忠华 。欢迎购买正版书籍。 ... 本文参考:《RabbitMQ实战指南》4.2 , 4.3 ,4.4...通过 RabbitMQ 延迟队列,我们需要关注的2个知识点(这里不进行知识普及,可以通过推荐书籍,官方网...
  • 延迟队列的基础原理 Time To Live(TTL) RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter...
  • Rabbitmq 通过延迟插件实现延迟队列 文章目录DLX+TTL 存在时序问题安装延迟插件下载地址安装Java 代码实现 DLX+TTL 存在时序问题 由于队列先入先出的特性. 通过死信队列(DLX)和给每条消息设置过期时间(TTL)来实现...
  • RabbitMQ 延迟队列

    2021-02-05 14:01:32
    RabbitMQ 延迟队列什么是延迟队列Time To Live(TTL)Dead Letter Exchanges(DLX)延迟队列实现方式一(推荐)原理代码实现延迟队列实现方式二(不推荐)原理缺点 什么是延迟队列 延迟队列存储的对象肯定是对应的延时消息...
  • RabbitMq插件实现延迟队列

    千次阅读 2021-11-14 21:42:53
    因为延迟队列的需求非常多,而手动的用死信队列实现延迟队列也较为麻烦,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。 这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:...
  • RabbitMq 实现延迟队列

    2019-09-13 02:38:40
    欢迎转载分享,转载请注明...推荐书籍:《RabbitMQ实战指南 》朱忠华 。欢迎购买正版书籍。 想要 电子版参考的小伙伴,可以关注公众号 【WTF名字好难取】回复,【推荐书籍】,获取下载地址。 本文参考:《RabbitM...
  • Rabbitmq实现延时队列一般而言有两种形式: 方式一:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX) 方式二:利用rabbitmq中的插件x-delay-message 方式一: 此方式使用的是direct模式的...
  • rabbitmq消息队列原理

    万次阅读 多人点赞 2018-08-12 23:55:40
    RabbitMQ是一个流行的开源消息队列系统,是AMQP(高级消息队列协议)标准的实现,由以高性能、健壮、可伸缩性出名的Erlang语言开发,并继承了这些优点。rabbitmq简单架构如下: 上图简单展示了rabbitmq的架构,从...
  • RabbitMQ如何实现延迟队列

    万次阅读 热门讨论 2017-02-14 21:10:08
    什么是延迟队列 延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。 场景一:在订单系统中,一个...
  • 延迟队列顾名思义延迟消费数据,那么先解释下延迟队列涉及的关键概念 1、消息的TTL(Time To Live) RabbitMQ允许为消息和队列设置TTL(生存时间),若对消息设置了ttl,如果超过了ttl配置则消息死了,称之为死信....
  • Hyperf框架实现Rabbitmq延迟队列

    千次阅读 2020-07-02 11:44:01
    Hyperf框架实现Rabbitmq延迟队列 Hyperf框架官方支持了Amqp,但是只是具备了基础发消息和接受消息。对于我们经常使用的延迟队列却不支持,这让人感到痛苦。 设计延迟队列 由于Rabbitmq默认没有支持延迟队列,需要...
  • rabbitMq实现延迟队列

    千次阅读 2021-12-07 15:29:33
    1 安装rabbitMq windows安装 ubuntu中安装 2 添加maven依赖 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --> <dependency> <groupId&
  • 由于太长了,所以分成两篇写,接上一篇讲解了消息的定义和发送,这里继续讲解消费者 ...测试结果:延迟消费和具体方法处理具体业务也实现了 源码地址: https://gitee.com/JanXs/springboot-rabbitMQ
  • 在阿里云栖开发者沙龙PHP技术专场上,掌阅资深后端工程师、掘金小测《Redis深度历险》作者钱文品为大家介绍了RabbitMQ的延时队列和镜像队列原理与实践,重点比较了RabbitMQ提供的消息可靠与不可靠模式,同时介绍了...
  • 最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉。当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在...
  • RabbitMQ消息队列工作原理及集成使用

    千次阅读 2020-03-17 10:40:14
    详细的队列数据结构可以查看下面的文章: 数据结构--队列Queue的实现原理_沙滩的流沙520的博客-CSDN博客 MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,528
精华内容 3,011
关键字:

rabbitmq延迟队列原理