精华内容
下载资源
问答
  • mq延时队列
    千次阅读
    2019-02-18 21:22:45

    简单实现mq延时队列
    1.RabbitMqConfiguration (mq配置类)

    @Configuration
    @Slf4j
    public class RabbitMqConfiguration{
    
        @Bean
        public ConnectionFactory connectionFactory(@Value("${rabbitmq.host}") String host,
            @Value("${rabbitmq.port}") int port, @Value("${rabbitmq.username}") String username,
            @Value("${rabbitmq.password}") String password,
            @Value("${rabbitmq.publisher-confirms}") Boolean publisherConfirms) {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost(host);
            connectionFactory.setPort(port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setPublisherConfirms(publisherConfirms);
            log.info("RabbitMq connectionFactory is finash start!");
            return connectionFactory;
        }
    
        @SuppressWarnings("AlibabaRemoveCommentedCode")
        @Bean(name = MqProperties.CONTAINER_FACTORY)
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setAcknowledgeMode(AcknowledgeMode.NONE);
            // 每个队列设置3个消费者避免单个消费者阻塞而失败
            factory.setConcurrentConsumers(3);
            factory.setMaxConcurrentConsumers(5);
            return factory;
        }
    
        @Bean
        public AmqpTemplate amqpTemplate(@Autowired ConnectionFactory connectionFactory) {
            Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setEncoding("UTF-8");
            // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                String correlationId = message.getMessageProperties().getCorrelationId();
                if (log.isDebugEnabled()) {
                    log.debug("amqp send fail message<{}> replyCode<{}> reason<{}> exchange<{}>  routeKey<{}>",
                        correlationId, replyCode, replyText, exchange, routingKey);
                }
            });
            // 消息确认,yml需要配置 publisher-confirms: true
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    if (log.isDebugEnabled()) {
                        log.debug("amqp send success id<{}>", correlationData.getId());
                    }
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("amqp send fail reason<{}>", cause);
                    }
                }
            });
            return rabbitTemplate;
        }
    
        /**
         * 构建管理类
         *
         * @param connectionFactory
         * @param mqProperties
         * @return
         */
        @Bean
        public AmqpAdmin amqpAdmin(@Autowired ConnectionFactory connectionFactory, @Autowired MqProperties mqProperties) {
            AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
            
            // 预提现死信队列
            // 死信交换机
            DirectExchange exchange =
                (DirectExchange)ExchangeBuilder.directExchange(mqProperties.getExchangeOrderPre()).build();
            amqpAdmin.declareExchange(exchange);
    
            // TTL消息队列
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", mqProperties.getExchangeOrderPre());
            arguments.put("x-dead-letter-routing-key", mqProperties.getQueueOrderDRT());
            Queue orderPreDLX = QueueBuilder.durable(mqProperties.getSendOrderPreDLX()).withArguments(arguments).build();
            amqpAdmin.declareQueue(orderPreDLX);
    
            // 消息队列的交换机绑定
            amqpAdmin.declareBinding(BindingBuilder.bind(orderPreDLX).to(exchange).with(mqProperties.getSendOrderPreDLX()));
    
            // 死信转发队列
            Queue orderPreDRT = QueueBuilder.durable(mqProperties.getQueueOrderDRT()).withArguments(arguments).build();
            amqpAdmin.declareQueue(orderPreDRT);
    
            // 转发队列的交换机绑定
            amqpAdmin.declareBinding(BindingBuilder.bind(orderPreDRT).to(exchange).with(mqProperties.getQueueOrderDRT()));
    
            return amqpAdmin;
        }
    
    }
    

    2.MqSendService(mq服务类,发送mq消息)

    /**
     * 有有效期的的队列
     * 
     * @param exchange 交换机名称
     * @param queueName 队列名称
     * @param message消息内容
     * @param times 延迟时间 单位毫秒
     */
    @Async
    public void send(String exchange, String queueName, String message, long times) {
        // 消息发送到死信队列上,当消息超时时,会发生到转发队列上,转发队列根据下面封装的queueName,把消息转发的指定队列上
        // 发送前,把消息进行封装,转发时应转发到指定 queueName 队列上
        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) {
                message.getMessageProperties().setExpiration(times + "");
                return message;
            }
        };
        amqpTemplate.convertAndSend(exchange, queueName, message, processor);
    }
    

    3.OrderService(创建订单时发送死信消息)

     // 启动延时队列 (为了准时取消,须保证mq服务时间与api服务时间一致)
            mqSendService.send(mqProperties.getExchangeOrderPre(), mqProperties.getSendOrderPreDLX(), orderId.toString(),
                loanPreStatusDuration);
    

    4.LoanMqListener 接受死信转发队列消息,取消超时订单

    @Component
    @Slf4j
    public class LoanMqListener extends BaseMqListener {
        @Autowired
        private LoanHandleService loanHandleService;
    
        /**
         * 订单到期处理
         * 
         * @param message
         */
        @RabbitListener(queues = "${mq.response.order.pre.repeat.trade}", containerFactory = MqProperties.CONTAINER_FACTORY)
        public void preLoanOverTime(String content) {
            log.info("订单:{}延时取消", content);
            loanHandleService.cancelLoanByPreTimeOut(Long.parseLong(content));
        }
    }
    

    5.补充.MqProperties

    /**
     * Mq 配置
     */
    @Component
    @Data
    @ToString
    public class MqProperties {
    
        /**
         * Mq 容器 factory
         */
        public static final String CONTAINER_FACTORY = "containerFactory";
        /**
         * 订单死信队列
         */
        @Value("${mq.request.order.pre.dead.letter}")
        private String sendOrderPreDLX;
    
        /**
         * 订单死信转发队列
         */
        @Value("${mq.response.order.pre.repeat.trade}")
        private String queueOrderDRT;
    
        /**
         * 订单死信转发队列交换机
         */
        @Value("${mq.exchange.order.pre}")
        private String exchangeOrderPre;
    
    }
    

    6.总结:
    优点:

    1. 可以实现实时取消订单,及时恢复订单占用资源(如订单中的商品),不占用应用服务器资源

    缺点:

    1. 可能会导致消息大量堆积
    2. 死信消息可能会导致运维预警,需要沟通

    欢迎补充

    更多相关内容
  • 为什么要用到延迟队列 在开发项目的时候我们通常会遇到这么一个问题,比如商城项目有一下单逻辑,下单成功数据保存在数据库中,下单成功后需要用户进行支付,如果在30分钟内支付失败,需要修改订单的支付状态为...

    学习使用,老鸟飞过,欢迎评论交流

    为什么要用到延迟队列

    在开发项目的时候我们通常会遇到这么一个问题,比如商城项目有一下单逻辑,下单成功数据保存在数据库中,下单成功后需要用户进行支付,如果在30分钟内支付失败,需要修改订单的支付状态为“支付超时”并关闭订单以及回退库存操作,那如何在下单30后准时检查支付结果处理订单状态呢?

    你可能想到了一个最简单的方法,就是使用定时任务扫描订单表,判断时间是否支付超时,这样的方式无疑是一种很消耗性能的做法,你试想一下,定时扫描一张数据量很大的表去判断时间和状态,而且99%的扫描都是无效的操作。

    那么该如何优雅的解决上述问题呢?我们可以采用延迟队列来实现,Redis和MQ都可以做到,本文章采用RabbitMQ的延迟队列来实现。

    延迟队列实现原理

    说到延迟队列就要说一说消息的过期时间(存活时间)TTL,RabbitMQ可以给队列设置过期时间,也可以单独给每个消息设置过期时间,如果到了过期时间消息没被消费该消息就会标记为死信消息。

    除此之外还有那些消息会成为死信消息?

    • 一是设置了TTL的消息到了TTL过期时间还没被消费,会成为死信
    • 二是消息被消费者拒收,并且reject方法的参数里requeue是false,意味这这个消息不会重回队列,该消息会成为死信,
    • 三是由于队列大小限制,新的消息进来队列可能满了,MQ会淘汰掉最老的消息,这些消息可能会成为死信消息

    成为死信的消息会进入一个死信交换机(Dead Letter Exchange)中,死信交换机也是一个普通的交换机而已,根据这一特点,我们可以准备一个队列来接收死信交换机中的死信消息,然后准备一个消费者来消费该队列中的消息,这样一来我们的延迟队列就有思路了,还是按照订单为例流程如下:
    在这里插入图片描述

    1. 下单成功(生产者),加入下单消息到队列(order.message)
    2. 队列设置TTL过期时间(10000毫秒),同时指定了死信交换机“delay-exchange”和死信交换机转发消息的队列“delay-message”
    3. 消息进入队列,等待一段时间,如果TTL时间到,订单消息会被MQ扔给死信交换机,死信交换机会把消息扔给指定的死信队列delay-message
    4. 消费者正好监听了死信队列delay-message,就可以获取到消息进行消费,比如检查该消息对应的订单是否支付,做出退库存处理等。

    整体效果就是,消息进入order.message队列 延迟 10秒后就 会进入delay-message队列然后被消费者消费处理,这就是一个延迟队列的效果。

    注意,这里的delay-exchange死信交换机其实就是一个普通的交换机而已,所以我们可以把上面的两个交换机合并成一个,如下:
    在这里插入图片描述

    延迟队列实战

    第一步,你需要集成RabbitMQ,我这里使用的是SpringBoot集成MQ

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    第二步,对MQ做一些配置

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtualHost: /
    

    第三步,定义交换机和队列

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    //rabbitMQ的配置
    @Configuration
    public class MQConfig {
        //交换机
        public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";
        //订单队列,该队列中的消息设置过期时间
        public static final String QUEUE_ORDER = "QUEUE_ORDER";
        //该队列用来接收死信交换机转发过来的消息
        public static final String QUEUE_DELAY = "QUEUE_DELAY";
        //队列的路由键,该路由键用来接收订单消息传出到订单队列
        public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";
        //该路由键用来接收死信交换机转发过来的消息
        public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";
    
        //定义交换机
        @Bean
        public Exchange exchangeDelay(){
            return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();
        }
        //该队列中的消息需要设置ttl
        @Bean(QUEUE_ORDER)
        public Queue queueOrder(){
            Map<String,Object> map = new HashMap<>();
            map.put("x-dead-letter-exchange", EXCHNAGE_DELAY);    //过期的消息给哪个交换机的名字
            map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY);   //死信交换机把消息个哪个个routingkey
            map.put("x-message-ttl", 10000);    //队列过期时间10s
            return new Queue(QUEUE_ORDER,true,false,false,map);
        }
        //该队列接收死信交换机转发过来的消息
        @Bean(QUEUE_DELAY)
        public Queue queueDelay(){
            return new Queue(QUEUE_DELAY,true);
        }
        @Bean
        public Binding queueOrderBinding(){
            return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();
        }
        @Bean
        public Binding queueDelayBinding(){
            return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();
        }
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    

    第四部,写一个消息发送者

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = MQApplication.class)
    public class Producer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate ;
    
       
    
        @Test
        public void sendDelayMessage() throws InterruptedException {
            System.out.println("发送消息:我是一个延迟消息,开始时间:"+System.currentTimeMillis());
            rabbitTemplate.convertAndSend(
                    MQConfig.EXCHNAGE_DELAY,
                    MQConfig.ROUTINGKEY_QUEUE_ORDER,
                    "我是一个延迟消息"
            );
    
            Thread.sleep(20000);
        }
    }
    

    第五步,写一个消费者

    @Component
    public class Consumer {
    
        @RabbitListener(queues = MQConfig.QUEUE_DELAY)
        public void handler(String message){
            System.out.println("收到消息:"+message+",结束时间:"+System.currentTimeMillis());
        }
    }
    

    第六步,测试效果

    • 生产者执行后,观察MQ,QUEUE_ORDER中有消息
      在这里插入图片描述

    • 等待10s之后,消息进入QUEUE_DELAY队列在这里插入图片描述

    • 控制台打印效果

    Producer:   发送消息:我是一个延迟消息,开始时间:1606295976347
    Consumer: 收到消息:我是一个延迟消息,结束时间:1606295986418
    

    发送消息到收到消息的时间差为 10071 , 忽略网络开销,延迟时间差不多就是我们设置的TTL时间

    文章结束,希望对你有所帮助

    展开全文
  • 一篇带您搞懂MQ延迟队列【实战操作】

    千次阅读 热门讨论 2021-05-26 15:31:54
    文章目录前言RabbitMq 专栏直通车MQ-死信队列(延迟操作)外加消息确认模式死信队列消息模型构建大概有几步?01::前期准备:引入相关依赖02::整合RabbitMQ02::01-加入RabbitMq相关配置03:: 创建真实队列--交换机、队列、...

    前言

      如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
      而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


    RabbitMq 专栏直通车

    序号直通车
    1干货实战-RabbitMQ(消息队列)的特性-并用具体应用场景来介绍
    2windows rabbitMQ安装(轻松简单,快速上手)
    3纯干货-详解RabbitMQ的消息模型(看一看,说不定有你想要的呢~~)
    4干货实战演练-RabbitMQ普通消息模型(字节流接受模式)<<带源码>>
    5干货实战演练-RabbitMQ普通消息模型(对象接受模式)<<带源码>>
    6干货实战演练-RabbitMQ广播消息模型(字节流接受模式)<<带源码>>
    7干货实战演练-RabbitMQ直连消息模型<<带源码>>
    8干货实战演练-RabbitMQ订阅消息模型<<带源码>>
    9干货实战-RabbitMQ的消息高可用和确认消费
    10干货实战演练-RabbitMQ基于MANUAL机制手动确认消费模型<<带源码>>
    11详解-RabbitMQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)
    12一篇带您搞懂MQ延迟队列实战操作<<带源码>>

    MQ-死信队列(延迟操作)外加消息确认模式

    先回顾一下RabbitMq核心基础组件:

    • 【生产者】: 用于生产、发送信息的模块
    • 【消费者】: 用于监听、接受、消费和处理信息的模块
    • 【消息】: 可以看成一串实质的数据,如文字、图片、等等,在整个传输过程中,消息是通过二进制数据流来传递的
    • 【队列】:消费的暂存区或者存储区,可以理解为中转站,即生产者 -> 队列 -> 消费者
    • 【交换机】:同样也可以看成是中转站,用于首次接受和分发消息
    • 【路由】:相当于网关、秘钥、地址等等,一般不单独使用,绑定到交换机上,将消息指定到指定的队列

    通过上篇文章,我们了解了死信队列的用处,这篇文章主要讲的就是实战,基本上都是代码。

    死信队列消息模型构建大概有几步?

    1. 创建死信队列
    2. 创建基本交换机 —> 面向生产者
    3. 创建基本绑定 —>基本交换机+基本路由 —> 面向生产者
    4. 创建死信交换机
    5. 创建死信路由及其绑定真正的消费队列

    好了,现在废话不多说,开始了;

    <<<<<<<<<<<<<<<<<<<<<<<<<<开始演练>>>>>>>>>>>>>>>>>>>>>>>>>>>

    01::前期准备:引入相关依赖

    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-amqp</artifactId>
    			<version>1.3.3.RELEASE</version>
    		</dependency>
    

    02::整合RabbitMQ

    02::01-加入RabbitMq相关配置

    即在配置文件中加入RabbitMq的配置,ip、端口号、账户密码等等…

    spring:
      rabbitmq: #RabbitMq 配置
        virtual-host: /
        host: 127.0.0.1  #IP
        post: 5672  #提供服务时的端口
        username: guest #连接RabbitMQ的账户
        password: guest #连接RabbitMQ的密码
    

    03:: 创建真实队列–交换机、队列、绑定,和确认消费相关配置

    
    /**
     * 基于手动确认消费模式实战配置
     * @author yangzhenyu
     * */
    @Configuration
    public class ManualMqConfig {
        private static Logger log = LoggerFactory.getLogger(ManualMqConfig.class);
        public ManualMqConfig() {
            log.info("=================== 基于手动确认消费模式实战配置注入IOC===================");
        }
        @Autowired
        private Environment environment;
        //自动装配 RabbitMQ 的连接工厂实例
        @Autowired
        private CachingConnectionFactory cachingConnectionFactory;
    
        //消费者
        @Autowired
        private ManualConsumer manualConsumer;
    
        //创建队列
        @Bean(name = "manualQueueOne")
        public Queue manualQueueOne(){
            return new Queue(environment.getProperty("mq.yzy.info.manualqueue.name"),true);
        }
    
        //交换机
        @Bean
        public DirectExchange manualExchange(){
            return new DirectExchange(environment.getProperty("mq.yzy.info.manualexchange.name"),true,false);
        }
    
        //创建绑定
        //directQueueOne
        @Bean
        public Binding basicBindingOne(){
            return BindingBuilder.bind(manualQueueOne()).to(manualExchange()).with(environment.getProperty("mq.yzy.info.manualrouting.key.name"));
        }
    
        /**
         * 基于手动确认消费模式实战配置
         * */
        @Bean(name = "manualListenerContainer")
        public SimpleMessageListenerContainer manualListenerContainer(@Qualifier("manualQueueOne") Queue manualQueue){
            //自定义消息监听器所在的容器工厂
            SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer();
            //设置容器工厂所用的实例
            factory.setConnectionFactory(cachingConnectionFactory);
            //设置消息的确认消费模式,在这里为MANUAL,表示手动确认消费
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
            //设置并发消费者实例的初始值
            factory.setConcurrentConsumers(1);
            //设置并发消费者实例的最大数量
            factory.setMaxConcurrentConsumers(1);
            //设置并发消费者实例中每个实例拉取的消息数量
            factory.setPrefetchCount(1);
    
            //指定该容器监听的队列
            factory.setQueues(manualQueue);
            //指定该容器中的消费监听器 即消费者
            factory.setMessageListener(manualConsumer);
            return factory;
        }
    }
    

    03::01:设置消息的确认消费模式为手动确认

    在这里插入图片描述

    03::02:指定该容器监听的队列

    在这里插入图片描述
    在这里插入图片描述

    03::03:指定该容器中的消费监听器 即消费者

    在这里插入图片描述

    在这里插入图片描述

    03::04:交换机、队列持久化配置

    在这里插入图片描述

    03::05:相关配置信息:

    mq:
      env: loacl #自定义变量,表示本地开发
      yzy:
        info:
          manualqueue: #消息确认模式
            name: ${mq.env}.middleware.mq.yzy.info.manualqueue.one
          manualexchange: #消息确认模式
            name: ${mq.env}.middleware.mq.yzy.info.manualexchange.one
          manualrouting:
            key: #确认模式
              name: ${mq.env}.middleware.mq.yzy.info.manualrouting.key.one
    

    04:: 制作真实队列消费者,即上述模型中指定的消费者

    如下:

    
    /**
     * 认为手动确认消费-消费者-字节流模式
     * @author yangzhenyu
     * */
    @Component("manualConsumer")
    public class ManualConsumer implements ChannelAwareMessageListener {
        private static Logger log = LoggerFactory.getLogger(ManualConsumer.class);
    
        //序列化和返序列化
        @Autowired
        private ObjectMapper objectMapper;
    
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //获取消息属性
            MessageProperties messageProperties = message.getMessageProperties();
            //获取消息分发时的全局唯一标识
            long tag = messageProperties.getDeliveryTag();
            try{
                //获得消息体
                byte [] msg = message.getBody();
                //解析消息体
                Student student = objectMapper.readValue(msg,Student.class);
                log.info("基于manual机制-确认消息模式-人为手动确定消费-监听到消息:【{}】",objectMapper.writeValueAsString(student));
                //执行完逻辑后手动确认,第一个参数代表消息的分发标识(全局唯一),第二个参数代表是否允许批量确认消费
                channel.basicAck(tag,true);
            }catch (Exception e){
                log.error("确认消息模式-人为手动确定消费-发生异常:",e.fillInStackTrace());
                /**
                 * 如果在处理消息的过程中发生异常,则需要人为手动确认消费掉该消息
                 * 否则该消息将一直停留在队列中,从而导致重复消费
                 * */
                channel.basicReject(tag,false);
            }
        }
    }
    

    05:: 制作死信队列模型配置

    如下:

    /**
     * 死信队列消息模型构建
     * @author yangzhenyu
     * */
    @Configuration
    public class DeadExchangeConfig {
        private static Logger log = LoggerFactory.getLogger(DeadExchangeConfig.class);
        public DeadExchangeConfig() {
            log.info("=================== 死信队列消息模型构建注入IOC===================");
        }
        @Autowired
        private Environment environment;
        //死信路由
        private final static String DEAD_ROUTING="mq.yzy.info.deadrouting.key.name";
        //死信交换机
        private final static String DEAD_EXCHANGE = "mq.yzy.info.deadexchange.name";
        //死信队列
        private final static String DEAD_QUEUE = "mq.yzy.info.deadqueue.name";
        //死信模型->基本模型->基本交换机(面向生产者)
        private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
        //死信模型->基本模型->基本路由(面向生产者)
        private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
        //真正的队列 -> 面向消费者
        private final static String REAL_QUEUE="mq.yzy.info.manualqueue.name";
    
        /**
         * 创建死信队列
         * */
        @Bean
        public Queue basicDeadQueue(){
            //创建死信队列的组成成分map,用来存放组成成员的相关成员
            Map<String,Object> args = new HashMap<>(3);
            //创建死信交换机
            args.put("x-dead-letter-exchange",environment.getProperty(DEAD_EXCHANGE));
            //创建死信路由
            args.put("x-dead-letter-routing-key",environment.getProperty(DEAD_ROUTING));
            //设定 TTL ,单位是毫秒,在这里指的是60s
            args.put("x-message-ttl",60000);
            //创建并返回死信队列实例
            return new Queue(environment.getProperty(DEAD_QUEUE),true,false,false,args);
        }
    
        //创建基本交换机 ---> 面向生产者
        @Bean
        public TopicExchange basicProducerExchange(){
            return new TopicExchange(environment.getProperty(DEAD_EXCHANGE_PRODUCER),true,false);
        }
    
        //创建基本绑定 --->基本交换机+基本路由 ---> 面向生产者
        @Bean
        public Binding basicProducerBinding(){
            return BindingBuilder.bind(basicDeadQueue()).to(basicProducerExchange()).with(environment.getProperty(DEAD_ROUTING_PRODUCER));
        }
        //====================================================================================
    
    
    
        //创建死信交换机
        @Bean
        public TopicExchange basicDeadExchange(){
            //创建并返回死信交换机实例
            return new TopicExchange(environment.getProperty(DEAD_EXCHANGE),true,false);
        }
    
        //创建死信路由及其绑定真正的消费队列
        /**
         * @param manualQueue 真正的队列
         * */
        @Bean
        public Binding basicDeadBindingOne(@Qualifier("manualQueueOne") Queue manualQueue){
            return BindingBuilder.bind(manualQueue).to(basicDeadExchange()).with(environment.getProperty(DEAD_ROUTING));
        }
    
    }
    

    05::01 制作死信队列模型配置

    mq:
      env: loacl #自定义变量,表示本地开发
      yzy:
        info:
          deadqueue: #死信队列
            name:  ${mq.env}.middleware.mq.yzy.info.deadqueue.one
          deadrouting: #死信路由
            key:
              name: ${mq.env}.middleware.mq.yzy.info.deadrouting.key.one
            producer: #死信消息模型中 基本模型中的路由
              key:
                name: ${mq.env}.middleware.mq.yzy.info.deadrouting.producer.key.one
    

    06:: 制作死信队列-生产者

    package com.yzy.demo.rabbitmq.dead.publisher;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.yzy.demo.test.vo.Student;
    import org.slf4j.LoggerFactory;
    import org.slf4j.Logger;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.env.Environment;
    import org.springframework.stereotype.Component;
    
    /**
     * 死信队列消息模型构建---生产者
     * @author yangzhenyu
     * */
    @Component
    public class DeadPublisher {
        private static Logger log = LoggerFactory.getLogger(DeadPublisher.class);
        //序列化和返序列化
        @Autowired
        private ObjectMapper objectMapper;
        //定义RabbitMQ 组件
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //定义环境变量读取实例
        @Autowired
        private Environment env;
    
        //死信模型->基本模型->基本交换机(面向生产者)
        private final static String DEAD_EXCHANGE_PRODUCER="mq.yzy.info.deadexchange.producer.name";
        //死信模型->基本模型->基本路由(面向生产者)
        private final static String DEAD_ROUTING_PRODUCER="mq.yzy.info.deadrouting.producer.key.name";
    
        /**
         * 发送对象类型的消息给死信队列
         * @param info
         * */
        public void sendMsg(Student info){
            try{
                if (info != null){
                    //定义消息传输的格式为json
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    //指定消息模型中的交换机
                    rabbitTemplate.setExchange(env.getProperty(DEAD_EXCHANGE_PRODUCER));
                    //将字符串值转换成二进制的数据流
                    Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(info))
                            .build();
                    rabbitTemplate.convertAndSend(env.getProperty(DEAD_ROUTING_PRODUCER),msg, new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            //获取消息的属性
                            MessageProperties messageProperties = message.getMessageProperties();
                            //设置消息的持久化模式
                            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            //设置消息头,即指定发送消息的所属对象类型
                            messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Student.class);
                            //设置消息的TTL,当消息和队列同时设置TTL时,取最短 10s
                            messageProperties.setExpiration(String.valueOf(10000));
                            return message;
                        }
                    });
                    log.info("死信队列消息模型 -生产者发出消息:{}",objectMapper.writeValueAsString(info));
                }
            }catch (Exception e){
                log.error("死信队列消息模型 -生产者发出消息-发生异常:{}",info,e.fillInStackTrace());
            }
        }
    }
    
    

    注: 置消息的TTL,当消息和队列同时设置TTL时,取最短。

    07:: 制作测试代码

    如下:

        //死信队列-延迟
        @Autowired
        private DeadPublisher deadPublisher;
        @Autowired
        private ObjectMapper objectMapper;
        /**
         * 死信队列模型演示
         * */
        @ApiOperation(value = "死信队列模型演示",notes = "死信队列模型演示")
        @ResponseBody
        @PostMapping("/deadPublisher")
        public ResponseBo deadPublisher(@RequestBody @Valid Student vo) throws JsonProcessingException {
            String msgValue = "deadPublisher";
            long startTime = init(msgValue,objectMapper.writeValueAsString(vo));
            try{
                deadPublisher.sendMsg(vo);
                endLog(msgValue,startTime);
            }catch (Exception e){
                endLogError(msgValue,startTime,e);
            }
            return ResponseBo.ok();
        }
    

    vo:

    
    /**
     * 学生
     * @author yangzhenyu
     * */
    public class Student implements Serializable {
        /**
         * 序列号
         */
        private static final long serialVersionUID = -5023112818896544461L;
        @NotNull(message = "学生 id值不能为null")
        @ApiModelProperty(" 学生 id值")
        private String sId;
        @ApiModelProperty(" 学生 名称")
        private String sName;
        @ApiModelProperty(" 学生 班级")
        private String className;
    
        public String getsId() {
            return sId;
        }
    
        public void setsId(String sId) {
            this.sId = sId;
        }
    
        public String getsName() {
            return sName;
        }
    
        public void setsName(String sName) {
            this.sName = sName;
        }
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    }
    
    

    注:vo类要继承Serializable ,序列化。

    08:: 测试

    08::01启动demo

    可以在RabbitMq管理平台上看到,我们的交换机和队列已经生成:
    在这里插入图片描述
    点击基础交换机,可以看到绑定的是死信队列:
    在这里插入图片描述
    点击死信交换机,可以看到绑定的是真实的队列:

    在这里插入图片描述
    注:其中"D"表示持久化。

    08::02 通过swagger来测试接口

    即通过观察控制台来判断是否搭建成功。
    先整理一下我们要测试的样例,通过观察控制台打印的结果来判断该消息模型是否搭建成功。
    》》》》开始测试:

    在这里插入图片描述
    在这里插入图片描述
    成功!!!
    观察控制台输出:
    在这里插入图片描述
    在这里插入图片描述

    通过观察发现,死信队列消息模型已经搭建成功!!!

    通过观察发现,该消息模型已经搭建成功!!!

    09::源码

    github:
    https://github.com/yangzhenyu07/springCloud

    本次分享到底结束,感谢观看!!!

    展开全文
  • 延时消息队列(延时不是死信,延时队列不需要监听,而死信队列需要监听) 3. 路由键 都与同一个交换机进行绑定 异步下单路由键 异步支付路由键 死信消息路由键 延时消息路由键 业务流程 调用下单接口,使用...

    实现原理请移步至:小生不才-俏如来

    资源部署

    1. 交换机

    1. 交换机可以接收死信消息,部署一个即可

    2. 队列

    1. 异步下单队列
    2. 异步支付队列
    3. 死信消息队列
    4. 延时消息队列(延时不是死信,延时队列不需要监听,而死信队列需要监听)

    3. 路由键

    都与同一个交换机进行绑定
    
    1. 异步下单路由键
    2. 异步支付路由键
    3. 死信消息路由键
    4. 延时消息路由键

    业务流程

    1. 调用下单接口,使用rabbitmq进行异步下单,将订单信息封装,转发到订单队列中
    2. 消费者监听订单队列,将订单信息保存到数据库中,同时再次向延时队列发送消息
    3. 消费者不会监听延时队列,到达预定时间后,会将信息自动转发到死信队列中
    若未完成支付
    1. 消费者监听到死信队列中的信息,从数据库中查询到订单状态为未支付,将订单状态改为取消
    若完成支付
    1. 调用支付接口,将支付订单号等信息封装为对象,转发到支付队列中
    2. 消费者监听支付队列,将支付信息保存到数据库中,同时将订单状态改为已完成
    3. 消费者监听死信队列,从数据库中查询订单状态为已完成,不进行任何操作

    延迟队列实战

    1. 项目依赖

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.5.RELEASE</version>
        </parent>
    
        <dependencies>
            <!--引入web包,调用接口进行测试-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    		<!--springboot集成rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    

    2. yml配置

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        virtualHost: /
    
    server:
      port: 80
    

    3. 配置交换机和队列

    package com.qiuming.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    //rabbitMQ的配置
    @Configuration
    public class MQConfig {
        //定义交换机
        public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";
        
        //定义订单队列
        public static final String QUEUE_ORDER = "QUEUE_ORDER";
        //定义死信队列
        public static final String QUEUE_DELAY = "QUEUE_DELAY";
        //定义支付队列
        public static final String QUEUE_PAY = "QUEUE_PAY";
        //定义非监听队列,即用于延时消息的发送
        public static final String QUEUE_UNKNOW = "QUEUE_UNKNOW";
        
        //定义订单队列路由键
        public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";
        //定义死信队列路由键
        public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";
        //定义支付队列路由键
        public static final String ROUTINGKEY_QUEUE_PAY = "ROUTINGKEY_QUEUE_PAY";
        //定义非监听队列路由键,即用户绑定延时消息
        public static final String ROUTINGKEY_QUEUE_UNKNOW = "ROUTINGKEY_QUEUE_UNKNOW";
    
        //定义交换机
        @Bean
        public Exchange exchangeDelay(){
            return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();
        }
       
        //该队列为订单队列
        @Bean(QUEUE_ORDER)
        public Queue queueOrder(){
            return new Queue(QUEUE_ORDER,true);
        }
        //该队列接收死信交换机转发过来的消息
        @Bean(QUEUE_DELAY)
        public Queue queueDelay(){
            return new Queue(QUEUE_DELAY,true);
        }
        //该队列为支付队列
        @Bean(QUEUE_PAY)
        public Queue queuePay(){
            return new Queue(QUEUE_PAY,true);
        }
        //该队列为延时队列
        @Bean(QUEUE_UNKNOW)
        public Queue queueUnKnow(){
            Map<String,Object> map = new HashMap<>();
            //过期的消息给哪个交换机的名字
            map.put("x-dead-letter-exchange", EXCHNAGE_DELAY);
            //死信交换机把消息个哪个个routingkey
            map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY);
            //队列过期时间10s
            map.put("x-message-ttl", 10000);
            return new Queue(QUEUE_UNKNOW,true,false,false,map);
        }
        
        //用路由键绑定交换机和队列
        @Bean
        public Binding queueOrderBinding(){
            return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();
        }
        @Bean
        public Binding queueDelayBinding(){
            return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();
        }
        @Bean
        public Binding queuePayBinding(){
            return BindingBuilder.bind(queuePay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_PAY).noargs();
        }
        @Bean
        public Binding queueUnKnowBinding(){
            return BindingBuilder.bind(queueUnKnow()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_UNKNOW).noargs();
        }
        
        //定义json消息转换器
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    

    4. 定义接口

    定义两个接口用于测试,分别是下单接口和支付接口

    package com.qiuming.rabbitmq.controller;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import static com.qiuming.rabbitmq.config.MQConfig.*;
    
    /**
     * rabbitmq控制层
     * @author qiuming
     * @date 2021/03/22 14:03
     **/
    @RestController
    public class RabbitMQController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/rabbit-order")
        public void order(){
            rabbitTemplate.convertAndSend(
                    EXCHNAGE_DELAY,
                    ROUTINGKEY_QUEUE_ORDER,
                    "已生成订单,请在10s内完成支付");
        }
        
        @GetMapping("/rabbit-pay")
        public void pay(){
            Map<String, Object> map = new HashMap<>(4);
            map.put("orderSn", 1L);
            map.put("message", "已完成支付");
            map.put("success", true);
            rabbitTemplate.convertAndSend(
                EXCHNAGE_DELAY,
                ROUTINGKEY_QUEUE_PAY,
                map);
        }
    }
    

    5. 创建消费者

    package com.qiuming.rabbitmq.consumer;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import static com.qiuming.rabbitmq.config.MQConfig.*;
    
    /**
     * 消费者
     * @author qiuming
     * @date 2021/03/22 14:10
     **/
    @Component
    public class Consumer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //未连接数据库,使用静态变量进行测试
        private static Boolean success = null;
    
        /**
         * 死信队列,延时时间到达进入
         * @param message
         */
        @RabbitListener(queues = QUEUE_DELAY)
        public void listener1(@Payload Map<String,Object> map,Message message){
            //从数据库中查询数据,看订单是否支付完成
            System.out.println("10s到了,死信队列触发");
            if (success!=null && success) {
                System.out.println("支付已成功,死信队列不执行任何操作");
                success = null;
                return ;
            }
            //订单在数据库中处于未完成
            System.out.println(map.get("message"));
            success = null;
        }
    
        /**
         * 生成订单队列
         * @param message
         */
        @RabbitListener(queues = QUEUE_ORDER)
        public void listener2(@Payload String str,Message message) {
            System.out.println(str);
            //设置10s的支付时间,失败支付则计入死信队列
            Map<String, Object> map = new HashMap<>(4);
            success = false;
            map.put("orderSn", 1L);
            map.put("message", "未在规定时间内完成支付,订单取消");
            rabbitTemplate.convertAndSend(EXCHNAGE_DELAY,ROUTINGKEY_QUEUE_UNKNOW,map);
        }
    
        /**
         * 支付订单队列
         */
        @RabbitListener(queues = QUEUE_PAY)
        public void listener3(@Payload Map<String,Object> map, Message message) throws InterruptedException {
            success = (boolean)map.get("success");
            if (success) {
                System.out.println("order sn = "+map.get("orderSn")+",message = "+"支付成功");
            }else {
                System.out.println("支付失败");
            }
        }
    }
    

    6. 启动类

    package com.qiuming.rabbitmq;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    
    /**
     * rabbitmq启动类
     * @author qiuming
     * @date 2021/03/22 14:03
     **/
    @SpringBootApplication
    public class RabbitMQApplication {
        public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(RabbitMQApplication.class);
        }
    }
    

    打完收工(测试)

    下单接口:http://localhost/rabbit-order

    支付接口:http://localhost/rabbit-pay

    1. 调用下单接口,不调用支付接口
    在这里插入图片描述
    2. 先调用下单接口,再调用支付接口
    在这里插入图片描述

    此外:可以根据服务器配置,适当减少50ms-100ms的延时时间,使得业务更加精确

    码云传送门

    展开全文
  • RocketMQ实现延时队列原理

    千次阅读 2021-08-13 10:38:44
    说明:rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时。 具体实现:rocketmq发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息...
  • MQ延迟队列实现延迟消息
  • RabbitMQ延时队列

    2022-03-15 06:08:49
    可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果 4、Dead Letter Exchange (DLX) 5、延时队列的实现 设置队列过期时间实现延时队列 (推荐) 设置消息过期时间来实现延时队列 6、...
  • RabbitMQ延时队列应用

    2021-04-25 10:41:43
    RabbitMQ 延时队列的介绍和应用
  • 该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...
  • 目录延时队列概念使用场景延时队列实现 延时队列 概念 延时队列是存储延时消息的队列,延时消息就是生产者发送了一条消息,但是不希望该消息不要被立即消费,而是设置一个延时时间,等过了这个时间再消费消息 使用...
  • RabbitMQ 之延时队列

    2021-07-09 11:13:13
    文章目录什么是延时队列延时队列使用场景RabbitMQ中的TTLRabbitMQ实现延时队列RabbitMQ延时队列优化利用RabbitMQ插件实现延时队列总结 什么是延时队列 延时队列,首先,它是一种队列,队列意味着内部的元素是有序的...
  • RabbitMQ的死信队列和延时队列

    千次阅读 2021-12-26 13:51:49
    本文介绍并实战RabbitMQ的死信队列,以及通过RabbitMQ如何实现延时队列
  • 幂等性 本地事务 分布式事务 MQ延时队列-最终一致性
  • rabbitmq 延时队列踩坑记

    千次阅读 2021-10-26 13:59:13
    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 ...往往解决这些场景的技术手段无外乎于两种,1 定时任务调度 2 延时队列。鉴于定时任务实时性不好控制,往往使用延时队列来实现处理。 JDK ...
  • java实现延时队列

    千次阅读 2021-05-14 09:14:25
    延时队列主要应用场景是用户登录后延时推送消息,通知等,一般用mq中间件来弄,下面我来用java实现 一、消息实体类实现Delayed接口 import lombok.Data; import java.util.concurrent.Delayed; import java.util...
  • 1、什么是延时队列延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费 2、适用场景 (1)商城订单超时未支付,取消订单 (2)使用权限到期前十分钟提醒用户 (3)收益项目,...
  • 基于队列和基于消息的TTLTTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-...
  • 基于队列和基于消息的TTL TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-...
  • activemq延时队列使用

    万次阅读 2018-07-25 10:32:48
    1、配置mq,activemq.xml配置文件,启用延时投递(注:schedulerSupport="true" 的位置要在最外面,否则不生效) 2、config配置 @Configuration public class QueueConfig { @Bean public Queue ...
  • 前言 延迟任务在工作的业务中有很多的应用场景,比如下订单后监听是否支付...这三种分别是JDK的延时队列、Redis、Rabbitmq,这里解释一下为什么没有介绍另外两种,因为基于数据库的方式在现在的项目中,尤其是大数据
  • RabbitMQ的延时重试队列

    千次阅读 2021-11-04 17:18:06
    如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列 如果重试次数大于3 那么进入死信队列 3.代码实现 1.业务队列 这里声明业务队列与绑定关系。 @Configuration public class BusinessConfi
  • 设置消息过期时间 消费者发送一个消息,设置了5分钟过期时间,最后交给了延时队列延时队列说消息死了不要乱放,指定了一个死信路由,用于找到下一个队列的路由键,等到五分钟后服务器会自动检查是否过期,过期的...
  • activemq 延时队列以及不生效问题

    千次阅读 2020-06-09 11:03:14
    最近在做的项目中有一个业务涉及到了订单的有效期的问题(即订单达到一定的时间未支付完成就让该订单失效),于是就想到了延时队列的方式,由于项目采用的是activemq,所以就写了个activemq延时队列代码如下: ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,651
精华内容 4,660
关键字:

mq延时队列