精华内容
下载资源
问答
  • RabbitMQ延时队列

    2021-08-23 16:23:50
    RabbitMQ延时队列一:TTL(消息过期) + 死信队列TTL 过期时间有两种设置方式:单独指定消息的过期时间给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。二:RabbitMQ插件使用Docker 安装RabbitMQ的延时...


    想要实现RabbitMQ延时队列有两种方式

    一:TTL(消息过期) + 死信队列

    TTL + 死信队列的实现方式主要是,TTL 来控制延时时间,等到延时时间过期,消息就会被扔到死信队列来处理,从而达到延时队列的效果。

    TTL 过期时间有两种设置方式:

    单独指定消息的过期时间

    @GetMapping("/topic/sendMsg2")
    	public String topicSendMsg2(){
    		String msg = "hello World";
    		// 对每条消息设置过期时间
    		MessageProperties messageProperties = new MessageProperties();
    		messageProperties.setExpiration("20000"); // 时间单位为 毫秒
    		Message message = new Message(msg.getBytes(),messageProperties);
    		System.out.println("topicSendMsg2{}"+  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    		rabbitTemplate.convertAndSend("test_topic_exchange_name","test_topic_routing_name",message);
    		return "ok~";
    	}
    

    优点:每条消息的过期时间都可以自由的控制,控制粒度小。
    缺点:没有统一的控制,如果过期时间一致的话,则需要每条都写过期配置
    消息推送到队列后,如果指定时间内没有被消费,则会自动过期。

    注意:
    RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。

    给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。

    // 正常的队列
    	@Bean
    	public Queue createTopicQueue(){
    		Map<String, Object> arguments = new HashMap<>(2);
    		// 设置过期时间,对 队列设置统一的过期时间,单位为 毫秒
    		arguments.put("x-message-ttl",30000);
    
    		// 绑定死信交换机
    		arguments.put("x-dead-letter-exchange", TEST_DEAD_TOPIC_EXCHANGE_NAME);
    		// 绑定死信的路由key
    		arguments.put("x-dead-letter-routing-key", TEST_DEAD_TOPIC_ROUTING_NAME);
    		// 绑定死信队列的交换机和路由
    		return new Queue(TEST_TOPIC_QUEUE_NAME,true,false,false,arguments);
    	}
    

    对队列设置过期时间,这队列的每条消息的过期时间都一致,
    注意:如果两个过期时间都设置的话,则以时间最短的那个为主。

    二:RabbitMQ插件使用

    Docker 安装RabbitMQ的延时插件

    下载插件

    根据自己的版本下载对应的插件
    rabbitmq-delayed-message-exchange
    在这里插入图片描述

    安装

    上传到服务器的/zhanghang/rabbitmq/plugs文件夹下,然后进行如下操作
    在这里插入图片描述
    在这里插入图片描述

    #拷贝到rabbitmq容器 773067241f96 中
    docker cp /zhanghang/rabbitmq/plugs/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbit:/plugins
    # rabbit 是我启动的rabbitmq的别名
    
    #进入容器
    docker exec -it rabbit /bin/bash
    
    #启用插件
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    #查看
    rabbitmq-plugins list
    
    #重新启动容器
    docker restart rabbit
    

    在这里插入图片描述
    在这里插入图片描述

    使用

    @Configuration
    public class TopicRabbitConfig1 {
    	// 正常的topic 队列
    	public static final String TEST_TOPIC_EXCHANGE_NAME = "test_topic_exchange_name1";
    	public static final String TEST_TOPIC_QUEUE_NAME = "test_topic_queue_name1";
    	public static final String TEST_TOPIC_ROUTING_NAME = "test_topic_routing_name1";
    
    
    	// 正常的队列
    	@Bean
    	public Queue createTopicQueue1(){
    		return new Queue(TEST_TOPIC_QUEUE_NAME,true,false,false);
    	}
    
        // 定义延时的交换机
    	@Bean
    	public CustomExchange createTopicExchange1(){
    		Map<String, Object> args = new HashMap<String, Object>();
    		args.put("x-delayed-type", "direct");
    		// 参数说明:
    		// name:    交换机名称
            // type:  交换机类型
    		// durable: 是否持久化
    		// autoDelete: 是否自动删除
            // arguments:  配置
    		return new CustomExchange(TEST_TOPIC_EXCHANGE_NAME,"x-delayed-message",true,false,args);
    	}
    
    	@Bean
    	public Binding createTopicBinding1(){
    		return BindingBuilder.bind(createTopicQueue1()).to(createTopicExchange1()).with(TEST_TOPIC_ROUTING_NAME).noargs();
    	}
    
    }
    
    // 发送消息
    @GetMapping("/topic/sendMsg2")
    	public String topicSendMsg2(){
    		String msg = "hello World";
    		// 对每条消息设置过期时间
    		MessageProperties messageProperties = new MessageProperties();
            // 设置延时时长,单位为毫秒
    		messageProperties.setDelay(1 * 60 * 1000);
    		Message message = new Message(msg.getBytes(),messageProperties);
    		System.out.println("topicSendMsg2{}"+  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    		rabbitTemplate.convertAndSend("test_topic_exchange_name1","test_topic_routing_name1",message);
    		return "ok~";
    	}
    

    测试发现经过一分钟后消费者收到消息

    展开全文
  • RabbitMQ 延时队列

    2021-08-12 16:36:19
    RabbitMQ 可对队列和消息分别设置 TTL。 对队列设置TTL,表明消息进入队列的时间超过队列设置的TTL而没有被消费,消息就会被清除。 对消息设置TTL,表明消息在队列中存活的最大时间是TTL,超过这个时间会被清除。 ...

    消息的TTL(Time To Live)

    1. 消息的 TTL 就是消息的存活时间。
    2. RabbitMQ 可对队列和消息分别设置 TTL。
      对队列设置TTL,表明消息进入队列的时间超过队列设置的TTL而没有被消费,消息就会被清除。
      对消息设置TTL,表明消息在队列中存活的最大时间是TTL,超过这个时间会被清除。
      如果队列设置了TTL而消息也设置了TTL,那么会选取两者之中较小的作为消息的TTL。
      消息设置TTL用 expiration 字段,队列设置TTL用 x-message-ttl 字段。
    3. 超过TTL时间没有被消费的消息,称之为死信。

    死信路由

    1. 死信的种类
      a. 被 consumer 拒收但又不能重写回到队列中的消息。
      b. 消息过期,超过TTL时间没有被消费的消息。
      c. 队列满了,队头被丢弃的消息。
    2. 什么是死信路由?
      当消息死亡后,不要被随意丢弃,而是丢弃到一个 exchange,该 exchange 就是死信路由。

    延时队列

    结合消息TTL 和 死信路由(Dead Letter Exchange) 可以实现延时队列
    实现方式1:
    在这里插入图片描述
    进入到队列 queue 的消息,都是在delay.queue中待够 300000ms 后,变成死信,到达 dead.exchange 交换机,被路由到到 queue里的。这样,消费者消费到的消息,都是经过 300000ms延时后的消息。
    delay.queue 设置了三个属性:
    x-message-ttl 设置消息在队列中的存活时间。
    x-dead-letter-exchange 设置消息变成死信后,交给的交换机。
    x-dead-letter-routing-key 设置消息变成死信后,被放入 queue 队列所用的路由键。

    实现方式2:
    在这里插入图片描述

    Spring Boot 创建 Queue Binding Exchange

    在这里插入图片描述

    /**
     * 容器中的Binding Queue Exchange 都会自动创建(RabbitMQ没有的情况下)
     */
    @Configuration
    public class MyMQConfig {
    
        /**
         * 先创建延时队列
         */
        @Bean
        public Queue orderDelayQueue() {
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 死信路由
            arguments.put("x-dead-letter-routing-key", "order.release.order"); // 死信用的路由键
            arguments.put("x-message-ttl", 60000);
    
            Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
            return  queue;
        }
    
        /**
         * 创建死信队列
         */
        @Bean
        public Queue orderReleaseOrderQueue() {
            Queue queue = new Queue("order.release.order.queue", true, false, false);
            return  queue;
        }
    
        /**
         * 创建exchange, 类型为topic
         */
        @Bean
        public Exchange orderEventExchange() {
            // 交换机的名字
            return new TopicExchange("order-event-exchange", true, false);
        }
    
        /**exchange 与 延时队列的绑定关系
         * 绑定关系
         * @return
         */
        @Bean
        public Binding orderCreateOrderBinding() {
            return new Binding("order.delay.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.create.order", null);
        }
    
        /**
         *  exchange 与 死信队列的绑定关系
         */
        @Bean
        public Binding orderReleaseOrderBinding() {
            return new Binding("order.release.order.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.release.order", null);
        }
    }
    

    测试给延迟队列 order.delay.queue 发送消息

    1. 测试将消息发送到延迟队列
      @Controller
      public class HelloController {
      
          @Autowired
          RabbitTemplate rabbitTemplate;
          
          @ResponseBody
          @GetMapping("/test/createOrder")
          public String createOrderTest() {
      
              OrderEntity orderEntity = new OrderEntity();
              orderEntity.setOrderSn(UUID.randomUUID().toString());
              orderEntity.setModifyTime(new Date());
      
              // 给MQ发消息
              rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", orderEntity);
              return "ok";
          }
      }
      
    2. 监听死信队列
          @RabbitListener(queues = "order.release.order.queue")
          public void listener(OrderEntity entity) {
              System.out.println("收到过期订单信息,准备关闭订单:" + entity.getOrderSn());
          }
      
    3. 现在看到延时队列中有条消息,就是我们测试的发送的消息
      在这里插入图片描述
      接着消息1分钟后过期,进入死信队列,因此我们的消费者监听到了这条消息,并且进行了消费。
      在这里插入图片描述

    下单业务 结合 MQ的延迟队列

    在这里插入图片描述

    展开全文
  • rabbitmq延时队列

    2019-07-18 11:29:47
    Rabbitmq最纯粹的事务应用通过延时队列讲述rabbitmq事务定义延时队列功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定...

    rabbitmq延时队列

    在开发中使用到定时任务这个功能,以前都是使用timer,quartz,redis的过期事件,quartz的话做集群依赖数据库,redis使用集群的时候不支持过期事件,都存在各种问题,所有想到了使用rabbitmq的延时队列来实现,那这其中也涉及到rabbitmq事务的相关设置,这里就简单写一下我工作中的体会

    定义延时队列

    队列的示意图
    这里就不再详细的介绍基本概念,主要涉及 发布者,交换器,队列,消费者,消费者和队列之间通过TCP建立Connection,connection中存在channel
    下面为延时队列的一个流程图,
    延迟队列示意图
    下面直接贴代码:

        //创建连接工厂,注意publisherConfirms,这个值必须设置
    	@Bean
    	public ConnectionFactory connectionFactory() {
    		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    		connectionFactory.setHost(host);
    		connectionFactory.setPort(port);
    		connectionFactory.setUsername(username);
    		connectionFactory.setPassword(password);
    		connectionFactory.setVirtualHost(virtualHost);
    		//消息发送到rabbitmq交换器后接受ack回调,回调中会告诉发送方,
    		//rabbit是否接受成功,没有成功的话,要做相应的处理,比如重发或者记录
    		connectionFactory.setPublisherConfirms(true); //这里相当于通过这个设置能实现事务的一半
    		//消息发送到rabbitmq交换器后无相应队列育交换器绑定时进行回调
    		connectionFactory.setPublisherReturns(true);
    		return connectionFactory;
    	}
    
    	/**
    	 * 声明延迟队列交换器 exchange
    	 * @return
    	 */
    	@Bean
    	public DirectExchange delayExchange() {
    		return new DirectExchange("delay_exchange");
    	}
    	/**
    	 * 声明延迟队列 queue
    	 */
    	@Bean
    	Queue delayQueue(){
    		return QueueBuilder.durable("delay_queue")
    		         // DLX,dead letter发送到的exchange:消息过期后转到的exchange
    				.withArgument("x-dead-letter-exchange", "process_exchange") 
    				 // dead letter携带的routing key:跳转到指定exchange后传送到指定routingkey的队列
    				.withArgument("x-dead-letter-routing-key", "delay") 
    				.build();
    	}
    
    	/**
    	 * 将延迟队列与exchange绑定
    	 * @return
    	 */
    	@Bean
    	Binding delayBinding() {
    		return BindingBuilder.bind(delayQueue())
    				.to(delayExchange())
    				.with("delay");
    	}
    

    上面延迟队列就声明完毕,下面声明一下正常消费队列

        //正常消费队列 交换器exchange 注意这里exchangeName和上面x-dead-letter-exchange对应的一致
        @Bean
    	public DirectExchange processExchange() {
    		return new DirectExchange("process_exchange");
    	}
    	@Bean
    	public Queue processQueue() {
    		return QueueBuilder.durable("process_queue")
    				.build();
    	}
    
        @Bean
    	Binding queueBinding() {
    		return BindingBuilder.bind(processQueue())
    				.to(processExchange())
    				//这个值必须和x-x-dead-letter-routing-key保持一致,这样才会传输到指定的队列中
    				.with("delay");
    	}
    
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)	//必须是prototype类型(有待考证)
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //使用return-callback时必须设置mandatory为true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            //做相应的操作
            }
        });
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String s) {
                if (!ack) {
                    logger.info("=============消息发送失败=============");
                    Msg msg = JSON
                            .parseObject(correlationData.getId(), Msg.class);
                    //这里做了重新传送,Msg是自定义的类,传递的消息。correlationData会设置一个id,在发送的时候设置一下,
                    //这里由于是返回的,已经带有id不用重新再设置
                    rabbitTemplate.convertAndSend(exchangeName, forceCloseRepoRouting, msg, correlationData);
                } else {
                    logger.info("=============消息发送成功=============");
                }
                logger.info("correlationData= {},b={},s={}",correlationData!=null?correlationData.toString():"",ack,s);
            }
        });
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
    

    消息发送方法:

    public void send() {
        Msg msg=new Msg();
        msg.setId(1);
        //携带确认数据,设置correlationDate的id,主要是用于消息没有发送成功,返回给生产者是通过id判断是哪个信息
        CorrelationData correlationData = new CorrelationData(JSON.toJSONString(msg));
        rabbitTemplate.convertAndSend(exchangeName, forceCloseRepoRouting, msg, correlationData);
    }
    

    到这里除了监听代码没有写出来,其他的代码都已经写出来了,慢慢干货,当然这里还有一些问题,后面有时间在和大家一块儿学习讨论吧

    展开全文
  • rabbitMQ 延时队列

    2020-09-06 19:38:13
    rabbitmq要实现延时队列,主要利用消息的过期时间TTL,和死信机制来做,简单来说,我们可以将需要延时发送的消息,设置过期时间,然后把消息发送到某个队列,并且在这个队列上绑定一个死信交换机,这个死信交换机和...

    来记录一下笔记吧

    rabbitmq要实现延时队列,主要利用消息的过期时间TTL,和死信机制来做,简单来说,我们可以将需要延时发送的消息,设置过期时间,然后把消息发送到某个队列,并且在这个队列上绑定一个死信交换机,这个死信交换机和另一个队列建立绑定关系,这样只需要等待第一个队列的消息过期,然后mq会自动将过期的消息通过绑定的死信交换机,路由到死信交换机绑定的队列,最后监听队列即可

    springboot环境

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
            
    

    mq相关配置

    spring.rabbitmq.addresses=
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    #必须为true,否则失败,发送者,接受不到回调消息,false 服务器丢失请求
    
    spring.rabbitmq.template.mandatory=true
    #消费端手工ack
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #消费者最小数量
    spring.rabbitmq.listener.simple.concurrency=1
    #消费者最大数量,控制并发,若需要消息有消费,则可配置1个消费端
    spring.rabbitmq.listener.simple.max-concurrency=1
    #每次请求处理的消息数
    spring.rabbitmq.listener.simple.prefetch=1
    

    配置:RabbitAdmin ,rabbitAdmin用来操作队列,交换机,
    比如说创建队列,创建交换机,建立绑定关系

    package com.fnm.feynman.hospital.common.config;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.context.annotation.Bean;
    
    /**
     * @author yanjun.liu
     * @version 1.0
     * @date 2020/9/2--14:38
     */
    @SpringBootConfiguration
    public class RabbitMqConfig {
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    }
    
    

    消息的发送者

    解释一下以下代码,最上方,是两个回调,在消息发送成功或者失败,将保存数据库做留存,以备后期对发送失败的消息进行补偿,
    下面贴上代码
    先发送一条及时消息,然后在发送一条延时消息

    package com.fnm.feynman.hospital.business.service.impl;
    import com.alibaba.fastjson.JSON;
    import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    import com.fnm.feynman.common.utils.UUIDUtils;
    import com.fnm.feynman.hospital.business.entity.ZcMqMessageEntity;
    import com.fnm.feynman.hospital.business.resp.OrderMessageResp;
    import com.fnm.feynman.hospital.business.service.SendMessageService;
    import com.fnm.feynman.hospital.business.service.ZcMqMessageService;
    import com.fnm.feynman.hospital.constant.FinalConst;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    import javax.annotation.Resource;
    import java.sql.Time;
    import java.time.LocalDate;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author yanjun.liu
     * @version 1.0
     * @date 2020/9/2--12:00
     */
    @Service
    @Slf4j
    public class SendMessageServiceImpl implements SendMessageService {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Resource
        private RabbitAdmin rabbitAdmin;
    
        @Resource
        private ZcMqMessageService zcMqMessageService;
    
        private static final String ROUTE_KING="#";
    
        private static final String DELAYED="_delayed";
    
        private static final String TTL="_ttl";
    
    
        /**
         * 回调函数: confirm确认
         * correlationData 消息唯一id
         * ack 是否到达Broker
         * cause nack出现异常,返回的异常消息
         */
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack) {
                    log.error("消息发送到Broker失败,消息id为:{}",correlationData);
                    updateMessageStatus(FinalConst.ERROR,correlationData,FinalConst.CONFIRM);
                } else {
                    updateMessageStatus(FinalConst.SUCCESS,correlationData,FinalConst.CONFIRM);
                    log.info("消息发送到Broker成功,消息id为:{}",correlationData);
                }
            }
        };
        private void updateMessageStatus(String status,CorrelationData correlationData,String callbackType){
            ZcMqMessageEntity zcMqMessageEntity=zcMqMessageService.getOne(new QueryWrapper<ZcMqMessageEntity>().lambda().eq(ZcMqMessageEntity::getMqMessageId,correlationData.getId()));
            if(zcMqMessageEntity != null){
                zcMqMessageEntity.setIsSend(FinalConst.SUCCESS);
                zcMqMessageEntity.setCallbackType(callbackType);
                zcMqMessageService.updateById(zcMqMessageEntity);
            }
        }
        /**
         * return返回回调
         * 若routingKey不可达。会回调此方法
         * message     消息
         * replyCode   不可达错误码
         * replyText   错误内容信息
         * exchange    交换机
         * routingKey  路由key
         */
        final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                                        String exchange, String routingKey) {
    
                String messageId = message.getMessageProperties().getMessageId();
                ZcMqMessageEntity zcMqMessageEntity=zcMqMessageService.getOne(new QueryWrapper<ZcMqMessageEntity>().lambda().eq(ZcMqMessageEntity::getMqMessageId,messageId));
                if(zcMqMessageEntity != null){
                    zcMqMessageEntity.setIsSend(FinalConst.ERROR);
                    zcMqMessageEntity.setCallbackType(FinalConst.RETURN);
                    zcMqMessageService.updateById(zcMqMessageEntity);
                }
                log.error("消息发送失败:routingKey不可达,消息为:{}",message.getMessageProperties().getMessageId());
                log.error("消息发送失败:routingKey不可达,消息为:{}",message);
                log.error("消息发送失败:routingKey不可达,错误码:{}",replyCode);
                log.error("消息发送失败:routingKey不可达,错误内容信息:{}",replyText);
                log.error("消息发送失败:routingKey不可达,交换机:{}",exchange);
                log.error("消息发送失败:routingKey不可达,路由key:{}",routingKey);
            }
        };
    
        /**
         * 发送预约消息
         * 若预约时间减去一小时小于,<当前时间,那么消息会丢失,延时队列不会收到该消息
         * @param queue
         * @param orderMessageResp
         */
        @Override
        public void sendOrderMessage(String exchange, String queue, OrderMessageResp orderMessageResp, Date date, Time time,String orderNo){
            String json = JSON.toJSONString(orderMessageResp);
            rabbitAdmin.declareExchange(new TopicExchange(exchange, true, false));
            rabbitAdmin.declareQueue(new Queue(queue, true));
            rabbitAdmin.declareBinding(
                BindingBuilder
                    .bind(new Queue(queue))
                    .to(new TopicExchange(exchange))
                    .with(ROUTE_KING));
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //消息id = 时间戳 + 全局唯一id
            CorrelationData correlationData = new CorrelationData(UUIDUtils.generate32BitId() + Long.toString(System.currentTimeMillis()));
            orderMessageResp.setTitle("预约成功");
            //入库
            saveMessage(json,exchange,queue,orderNo,correlationData.getId(),ROUTE_KING,FinalConst.NOT_DELAY);
            //发送
            rabbitTemplate.convertAndSend(exchange, ROUTE_KING, json, correlationData);
            log.info("消息发送成功,消息内容为:{}",json);
    		//开始发送延时消息,上面一部分是发送的即时消息
            //声明死信交换机,绑定到队列
            Map<String, Object> agruments = new HashMap<>(2);
            agruments.put("x-dead-letter-exchange", exchange+TTL);
            agruments.put("x-dead-letter-routing-key", ROUTE_KING);
    
            rabbitAdmin.declareQueue(new Queue(queue+DELAYED, true,false, false,agruments));
            rabbitAdmin.declareExchange(new TopicExchange(exchange+DELAYED, true, false));
            rabbitAdmin.declareBinding(
                BindingBuilder
                    .bind(new Queue(queue+DELAYED))
                    .to(new TopicExchange(exchange+DELAYED))
                    .with(ROUTE_KING));
            orderMessageResp.setMessage(orderMessageResp.getMessage()+",距离预约时间还有一小时,请您及时前往");
    
            LocalDateTime localDateTime = time.toLocalTime().atDate(LocalDate.from(date.toInstant().atZone(ZoneId.systemDefault()))).minusHours(1);
            //Long duration = null;
            long  duration = java.time.Duration.between(LocalDateTime.now(), localDateTime).toMillis();
            if(duration<0){
                duration= 30000L;
                orderMessageResp.setMessage(orderMessageResp.getMessage()+",您的预约时间不足一小时,请您及时前往");
            }
            orderMessageResp.setTitle("预约提醒");
            String delayedMessage = JSON.toJSONString(orderMessageResp);
            log.info("{}:毫秒之后过期,到死信队列",duration);
            //发送消息到延时队列,等待过期
            long finalDuration = duration;
            //延时队列消息id
            CorrelationData delayedCorrelationData = new CorrelationData(UUIDUtils.generate32BitId() + Long.toString(System.currentTimeMillis()));
            //入库
            saveMessage(delayedMessage,exchange+DELAYED,queue+DELAYED,orderNo,delayedCorrelationData.getId(),ROUTE_KING,FinalConst.YES_DELAY);
            rabbitTemplate.convertAndSend(exchange+DELAYED, ROUTE_KING, delayedMessage, message -> {
                MessageProperties messageProperties = message.getMessageProperties();
                // 设置这条消息的过期时间
                messageProperties.setExpiration(Long.valueOf(finalDuration).toString());
                return message;
            }, delayedCorrelationData);
            log.info("发送消息到延迟队列成功,消息为{},消息id为:{}",delayedMessage,correlationData);
    
            //死信交换机绑定死信队列
            rabbitAdmin.declareQueue(new Queue(queue+TTL, true));
            rabbitAdmin.declareExchange(new TopicExchange(exchange+TTL, true, false));
            rabbitAdmin.declareBinding(
                BindingBuilder
                    .bind(new Queue(queue+TTL))
                    .to(new TopicExchange(exchange+TTL))
                    .with(ROUTE_KING));
        }
    
        private void saveMessage(String json,String exchange,String queue,String orderNo,String correlationData,String routeKey,String delay){
            ZcMqMessageEntity zcMqMessageEntity = new ZcMqMessageEntity();
            zcMqMessageEntity.setMessage(json);
            zcMqMessageEntity.setExchange(exchange);
            zcMqMessageEntity.setQueue(queue);
            zcMqMessageEntity.setOrderNumber(orderNo);
            zcMqMessageEntity.setMqMessageId(correlationData);
            zcMqMessageEntity.setRouteKey(routeKey);
            zcMqMessageEntity.setSendTime(new Date());
            zcMqMessageEntity.setIsDelay(delay);
            zcMqMessageService.save(zcMqMessageEntity);
        }
    }
    
    

    以上是消息的发送者,消费者只要监听队列即可,然后做好手动ack确认消息
    ,因为消费者由前端监听队列消费,所以消费者的部分就省了。。。

    展开全文
  • RabbitMQ延时队列应用

    2021-04-25 10:41:43
    RabbitMQ 延时队列的介绍和应用
  • 主要介绍了SpringBoot使用RabbitMQ延时队列(小白必备),详细的介绍延迟队列的使用场景及其如何使用,需要的小伙伴可以一起来了解一下

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 9,076
精华内容 3,630
关键字:

rabbitmq延时队列