精华内容
下载资源
问答
  • setdelay
    2021-03-14 15:10:29

    import javax.swing.Timer; //导入方法依赖的package包/类

    @Override

    protected void initGUI() {

    // textView = new JEditorPane();

    // textView.setContentType("text/plain");

    // textView.setEditorKit(new RawEditorKit());

    textView = new JTextArea();

    textView.setAutoscrolls(false);

    textView.setLineWrap(true);

    textView.setWrapStyleWord(true);

    // the selection is hidden when the focus is lost for some system

    // like Linux, so we make sure it stays

    // it is needed when doing a selection in the search textfield

    textView.setCaret(new PermanentSelectionCaret());

    scroller = new JScrollPane(textView);

    textView.setText(document.getContent().toString());

    textView.getDocument().addDocumentListener(swingDocListener);

    // display and put the caret at the beginning of the file

    SwingUtilities.invokeLater(new Runnable() {

    @Override

    public void run() {

    try {

    if(textView.modelToView(0) != null) {

    textView.scrollRectToVisible(textView.modelToView(0));

    }

    textView.select(0, 0);

    textView.requestFocus();

    } catch(BadLocationException e) {

    e.printStackTrace();

    }

    }

    });

    // contentPane = new JPanel(new BorderLayout());

    // contentPane.add(scroller, BorderLayout.CENTER);

    // //get a pointer to the annotation list view used to display

    // //the highlighted annotations

    // Iterator horizViewsIter = owner.getHorizontalViews().iterator();

    // while(annotationListView == null && horizViewsIter.hasNext()){

    // DocumentView aView = (DocumentView)horizViewsIter.next();

    // if(aView instanceof AnnotationListView)

    // annotationListView = (AnnotationListView)aView;

    // }

    highlightsMinder = new Timer(BLINK_DELAY, new UpdateHighlightsAction());

    highlightsMinder.setInitialDelay(HIGHLIGHT_DELAY);

    highlightsMinder.setDelay(BLINK_DELAY);

    highlightsMinder.setRepeats(true);

    highlightsMinder.setCoalesce(true);

    highlightsMinder.start();

    // blinker = new Timer(this.getClass().getCanonicalName() +

    // "_blink_timer",

    // true);

    // final BlinkAction blinkAction = new BlinkAction();

    // blinker.scheduleAtFixedRate(new TimerTask(){

    // public void run() {

    // blinkAction.actionPerformed(null);

    // }

    // }, 0, BLINK_DELAY);

    initListeners();

    }

    更多相关内容
  • sv.setDelay(1000./sv.getFrameRate());//设置延迟参数,设置为"0"表示每帧均需要用户按键, //1000./sv.getFrameRate()表示此时的播放速率和原始视频的频率相同//设置延迟参数,设置为"0"表示每帧均需要用户按键, ...
  • rabbitTemplate.convertAndSend( XDelayedMessageConfig.DELAYED_EXCHANGE, XDelayedMessageConfig.ROUTING_KEY, msg, message->{ message.getMessageProperties().setDelay(delayTime); return message; }); } } ...

    消息队列:RabbitMQ。尽管它本身并没有提供延时队列的功能,但是我们可以利用它的存活时间和死信交换机的特性来间接实现。

    首先我们先来简单介绍下什么是存活时间?什么是死信交换机?

    存活时间

    存活时间的全拼是Time To Live,简称 TTL。它既支持对消息本身进行设置(延迟队列的关键),又支持对队列进行设置(该队列中所有消息存在相同的过期时间)。

    • 对消息本身进行设置:即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的;
    • 对队列进行设置:一旦消息过期,就会从队列中抹去;

    如果同时使用这两种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么该消息就“死了”,我们把它称为 死信 消息。

    消息变为死信的条件:

    • 消息被拒绝(basic.reject/basic.nack),并且requeue=false;
    • 消息的过期时间到期了;
    • 队列达到最大长度;

    队列设置注意事项

    1. 队列中该属性的设置要在第一次声明队列的时候设置才有效,如果队列一开始已存在且没有这个属性,则要删掉队列再重新声明才可以;
    2. 队列的 ttl 只能被设置为某个固定的值,一旦设置后则不能更改,否则会抛出异常;

    死信交换机

    死信交换机全拼Dead-Letter-Exchange,简称DLX。

    当消息在一个队列中变成死信之后,如果这个消息所在的队列设置了x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换机上,这个交换机就称之为死信交换机,与这个死信交换器绑定的队列就是死信队列。

    • x-dead-letter-exchange:出现死信之后将死信重新发送到指定交换机;
    • x-dead-letter-routing-key:出现死信之后将死信重新按照指定的routing-key发送,如果不设置默认使用消息本身的routing-key

    死信队列与普通队列的区别就是它的RoutingKey和Exchange需要作为参数,绑定到正常的队列上。

    实战教学:

    先来张图感受下我们的整体思路

    1. 生产者发送带有 ttl 的消息放入交换机路由到延时队列中;
    2. 在延时队列中绑定死信交换机与死信转发的routing-key;
    3. 等延时队列中的消息达到延时时间之后变成死信转发到死信交换机并路由到死信队列中;
    4. 最后供消费者消费。

    我们在上文的基础上进行代码实现:

    配置类

    @Configuration
    public class DelayQueueRabbitConfig {
    
        public static final String DLX_QUEUE = "queue.dlx";//死信队列
        public static final String DLX_EXCHANGE = "exchange.dlx";//死信交换机
        public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信队列与死信交换机绑定的routing-key
    
        public static final String ORDER_QUEUE = "queue.order";//订单的延时队列
        public static final String ORDER_EXCHANGE = "exchange.order";//订单交换机
        public static final String ORDER_ROUTING_KEY = "routingkey.order";//延时队列与订单交换机绑定的routing-key
    
     /**
         * 定义死信队列
         **/
        @Bean
        public Queue dlxQueue(){
            return new Queue(DLX_QUEUE,true);
        }
    
        /**
         * 定义死信交换机
         **/
        @Bean
        public DirectExchange dlxExchange(){
            return new DirectExchange(DLX_EXCHANGE, true, false);
        }
    
    
        /**
         * 死信队列和死信交换机绑定
         * 设置路由键:routingkey.dlx
         **/
        @Bean
        Binding bindingDLX(){
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
        }
    
    
        /**
         * 订单延时队列
         * 设置队列里的死信转发到的DLX名称
         * 设置死信在转发时携带的 routing-key 名称
         **/
        @Bean
        public Queue orderQueue() {
            Map<String, Object> params = new HashMap<>();
            params.put("x-dead-letter-exchange", DLX_EXCHANGE);
            params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
            return new Queue(ORDER_QUEUE, true, false, false, params);
        }
    
        /**
         * 订单交换机
         **/
        @Bean
        public DirectExchange orderExchange() {
            return new DirectExchange(ORDER_EXCHANGE, true, false);
        }
    
        /**
         * 把订单队列和订单交换机绑定在一起
         **/
        @Bean
        public Binding orderBinding() {
            return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
        }
    }
    

    发送消息

    @RequestMapping("/order")
    public class OrderSendMessageController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/sendMessage")
        public String sendMessage(){
    
            String delayTime = "10000";
            //将消息携带路由键值
            rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                    "发送消息!",message->{
                message.getMessageProperties().setExpiration(delayTime);
                return message;
            });
            return "ok";
        }
    
    }
    

    消费消息

    @Component
    @RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)//监听队列名称
    public class OrderMQReciever {
    
        @RabbitHandler
        public void process(String message){
            System.out.println("OrderMQReciever接收到的消息是:"+ message);
        }
    }

    测试

    通过调用接口,发现10秒之后才会消费消息

    问题升级

    由于开发环境和测试环境使用的是同一个交换机和队列,所以发送的延时时间都是30分钟。但是为了在测试环境让测试同学方便测试,故手动将测试环境的时间改为了1分钟。

    问题复现

    接着问题就来了:延时时间为1分钟的消息并没有立即被消费,而是等30分钟的消息被消费完之后才被消费了。至于原因,我们下边再分析,先用代码来给大家复现下该问题。

    @GetMapping("/sendManyMessage")
    public String sendManyMessage(){
        send("延迟消息睡10秒",10000+"");
        send("延迟消息睡2秒",2000+"");
        send("延迟消息睡5秒",5000+"");
        return "ok";
    }
    
    private void send(String msg, String delayTime){
     rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, 
                                      DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                                      msg,message->{
                                          message.getMessageProperties().setExpiration(delayTime);
                                          return message;
                                      });
    }
    

    执行结果如下:

    OrderMQReciever接收到的消息是:延迟消息睡10秒

    OrderMQReciever接收到的消息是:延迟消息睡2秒

    OrderMQReciever接收到的消息是:延迟消息睡5秒

    原因就是延时队列也满足队列先进先出的特征,当10秒的消息未出队列时,后边的消息不能顺利出队,造成后边的消息阻塞了,未能达到精准延时。

    问题解决

    我们可以利用x-delay-message插件来解决该问题

    消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒)

    1. 生产者发送消息到交换机时,并不会立即进入,而是先将消息持久化到 Mnesia(一个分布式数据库管理系统);
    2. 插件将会尝试确认消息是否过期;
    3. 如果消息过期,消息会通过 x-delayed-type 类型标记的交换机投递至目标队列,供消费者消费;

    实践

    官网下载:Community Plugins — RabbitMQ

    我这边使用的是v3.8.0.ez,将文件下载下来放到服务器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路径下,执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。

    出现如图所示,代表安装成功。

    配置类

    @Configuration
    public class XDelayedMessageConfig {
    
        public static final String DIRECT_QUEUE = "queue.direct";//队列
        public static final String DELAYED_EXCHANGE = "exchange.delayed";//延迟交换机
        public static final String ROUTING_KEY = "routingkey.bind";//绑定的routing-key
    
        /**
         * 定义队列
         **/
        @Bean
        public Queue directQueue(){
            return new Queue(DIRECT_QUEUE,true);
        }
    
        /**
         * 定义延迟交换机
         * args:根据该参数进行灵活路由,设置为“direct”,意味着该插件具有与直连交换机具有相同的路由行为,
         * 如果想要不同的路由行为,可以更换现有的交换类型如:“topic”
         * 交换机类型为 x-delayed-message
         **/
        @Bean
        public CustomExchange delayedExchange(){
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
        }
    
        /**
         * 队列和延迟交换机绑定
         **/
        @Bean
        public Binding orderBinding() {
            return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
        }
    
    }

    发送消息

    @RestController
    @RequestMapping("/delayed")
    public class DelayedSendMessageController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/sendManyMessage")
        public String sendManyMessage(){
    
            send("延迟消息睡10秒",10000);
            send("延迟消息睡2秒",2000);
            send("延迟消息睡5秒",5000);
            return "ok";
        }
    
        private void send(String msg, Integer delayTime){
            //将消息携带路由键值
            rabbitTemplate.convertAndSend(
                    XDelayedMessageConfig.DELAYED_EXCHANGE,
                    XDelayedMessageConfig.ROUTING_KEY,
                    msg,
                    message->{
                        message.getMessageProperties().setDelay(delayTime);
                        return message;
                    });
        }
    }

    消费消息

    @Component
    @RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)//监听队列名称
    public class DelayedMQReciever {
    
    
        @RabbitHandler
        public void process(String message){
            System.out.println("DelayedMQReciever接收到的消息是:"+ message);
        }
    }

    测试

    DelayedMQReciever接收到的消息是:延迟消息睡2秒

    DelayedMQReciever接收到的消息是:延迟消息睡5秒

    DelayedMQReciever接收到的消息是:延迟消息睡10秒

    这样我们的问题就顺利解决了。

    局限性

    延迟的消息存储在一个Mnesia表中,当前节点上只有一个磁盘副本,它们将在节点重启后存活。

    虽然触发计划交付的计时器不会持久化,但它将在节点启动时的插件激活期间重新初始化。显然,集群中只有一个预定消息的副本意味着丢失该节点或禁用其上的插件将丢失驻留在该节点上的消息。

    该插件的当前设计并不适合延迟消息数量较多的场景(如数万条或数百万条),另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。

    项目源码地址:rabbit-mq: rabbit-mq延迟队列代码

    展开全文
  • 1 概述 常用的延迟消息实现方式有: 利用 队列TTL + 死信队列 方式实现 利用消息延迟插件实现 消息变成死信的原因有: ...消费者调用了 channel.basicNack 或 channel.basicReject ,并且设置 requeue=false ...

    1 概述

    常用的延迟消息实现方式有:

    • 利用 队列TTL + 死信队列 方式实现

    • 利用消息延迟插件实现

    消息变成死信的原因有:​​​​

    • 消息过期。消息TTL或队列TTL

    • 消息被拒绝。消费者调用了 channel.basicNackchannel.basicReject ,并且设置 requeue=false

    • 队列满。

      当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面丢弃或 dead-letter 消息(即队列中最早的消息)。要修改这种行为,请使用下面描述的 overflow 设置

      overflow

      常见参数说明

    2 队列TTL + 死信队列方式

    这里直接贴出 rabbitConfig 代码,其他的代码参考该文章:RabbitMQ (三)消息重试

    1 RabbitConfig

    主要操作:

    1. 创建死信队列和交换器,并绑定

    2. 创建队列,同时设置队列的TTL、绑定死信队列;创建交换器,并绑定,

    package com.fmi110.rabbitmq.config;
    ​
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    ​
    import java.util.HashMap;
    ​
    ​
    /**
     * @author fmi110
     * @description rabbitMQ 配置类
     * @date 2021/7/1 15:08
     */
    @Configuration
    @Slf4j
    public class RabbitConfig {
    ​
        String dlQueueName  = "my-queue-dl"; // 普通队列名称
        String dlExchangeName = "my-exchange-dl"; // 死信交换器名称
        String dlRoutingKey   = "rabbit.test";
    ​
        String queueName = "retry-queue";
        String exchangeName = "my-exchange"; // 普通交换器名称
    ​
        /**
         * 创建死信队列
         *
         * @return
         */
        @Bean
        public Queue queueDL() {
    ​
            return QueueBuilder
                    .durable(dlQueueName) // 持久化队列
                    .build();
        }
    ​
        /**
         * 创建死信交换机
         *
         * @return
         */
        @Bean
        public TopicExchange exchangeDL() {
            return new TopicExchange(dlExchangeName, true, false);
        }
    ​
        /**
         * 绑定操作
         */
        @Bean
        public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {
            log.info(">>>> 队列与交换器绑定");
            return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);
        }
    ​
        /**
         * 创建持久化队列,同时绑定死信交换器
         *
         * @return
         */
        @Bean
        public Queue queue() {
            log.info(">>>> 创建队列 retry-queue");
            HashMap<String, Object> params = new HashMap<>();
            params.put("x-dead-letter-exchange", dlExchangeName);
            params.put("x-dead-letter-routing-key", dlRoutingKey);
    ​
            params.put("x-message-ttl", 10 * 1000); // 队列过期时间 10s
    ​
            return QueueBuilder
                    .durable(queueName) // 持久化队列
                    .withArguments(params) // 关联死信交换器
                    .build();
        }
    ​
    ​
        /**
         * 创建交换机
         *
         * @return
         */
        @Bean
        public TopicExchange exchange() {
            log.info(">>>> 创建交换器 my-exchange");
            boolean durable    = true; // 持久化
            boolean autoDelete = false; // 消费者全部解绑时不自动删除
            return new TopicExchange(exchangeName, durable, autoDelete);
        }
    ​
        /**
         * 绑定队列到交换机
         *
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {
            log.info(">>>> 队列与交换器绑定");
            return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");
        }
    ​
    }

    2 RabbitConsumer 消费者

    延迟消息通过队列的TTL产生,所以这里不应该设置普通队列的消费者,让消息过期然后自动转入死信队列,此时再进行消费以此实现延迟消息

    package com.fmi110.rabbitmq;
    ​
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    ​
    import java.util.concurrent.atomic.AtomicInteger;
    ​
    ​
    /**
     * @author fmi110
     * @description 消息消费者
     * @date 2021/7/1 16:08
     */
    @Component
    @Slf4j
    public class RabbitConsumer {
        /**
         * 死信队列消费者
         * @param data
         * @param channel
         * @param tag
         * @throws Exception
         */
        @RabbitListener(queues="my-queue-dl")
        public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            log.info(">>>> {} 死信队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);
        }
    }

    3 弊端

    如上图所示实现了延迟10s的消息,但是如果需要实现延迟5s的消息,则需要新建一个TTL为5s的队列,所以如果延迟时间需要很多的话,就需要创建很多队列,实现起来比较麻烦。

    再贴一段对消息设置TTL的代码:

       AtomicInteger aint = new AtomicInteger();
        public void send(String msg) {
            String exchangeName = "my-exchange";
            String routingKey   = "rabbit.test";
            // rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    ​
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setCorrelationId(UUID.randomUUID().toString().replace("-", ""));
            // TTL 为5s
            int i = 9 * 1000;
    ​
            if (aint.incrementAndGet() % 2 == 0) {
                i = 5 * 1000;
            }
            msg = "message send at " + dateFormat.format(new Date()) +", expired at "+dateFormat.format(new Date().getTime()+i);
            messageProperties.setExpiration(String.valueOf(i)); // 设置过期时间
            Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
            rabbitTemplate.send(exchangeName, routingKey, message);
        }

    可以看到消息的过期时间与期望的不一致。因为队列是先进后出的,只有在头部的消息,系统才对其进行过期检测。所以如果消息不再队列头部,即使时间已经过期,也不会导致消息进入死信队列!!!

    当同时设置了消息的TTL和队列的TTL时,过期时间谁小谁生效(队列头部的消息才进行TTL检测)。

    3 使用延迟插件实现

    插件的安装参考 docker安装rabbitMQ

    1 RabbitConfig

    使用延迟插件实现,需要创建延迟交换器,使用 CustomExchange 类创建,同时指定交换器类型为 x-delayed-message ,此外还需要设置属性 x-delayed-type ,创建的交换器如下图所示

    package com.fmi110.rabbitmq.config;
    ​
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    ​
    import java.util.HashMap;
    ​
    /**
     * @author fmi110
     * @description 配置交换器、队列
     * @date 2021/7/3 9:58
     */
    @Slf4j
    @Configuration
    public class RabbitConfig2 {
    ​
        String exchangeName = "delay-exchange";
        String queueName    = "delay-queue";
        String exchangeType = "x-delayed-message";
    ​
        @Bean
        public CustomExchange exchange() {
    ​
            HashMap<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "topic");
            return new CustomExchange(exchangeName, exchangeType, true, false, args);
        }
    ​
        @Bean
        public Queue queue() {
            return new Queue(queueName, true, false, false);
        }
    ​
        @Bean
        public Binding binding(CustomExchange exchange, Queue queue) {
            return BindingBuilder.bind(queue)
                                 .to(exchange)
                                 .with("rabbit.delay")
                                 .noargs();
    ​
        }
    }

    2 RabbitProducer

    这里开启了消息投递失败回调。测试中发现,使用延迟插件,虽然消息正常投递了,但是始终会报 “NO_ROUTER” 提示路由失败。虽然不影响功能。运行截图见后文。目前不确定是我设置问题还是框架的问题...

    package com.fmi110.rabbitmq;
    ​
    import com.rabbitmq.client.AMQP;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    ​
    import javax.annotation.PostConstruct;
    import java.nio.charset.StandardCharsets;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.UUID;
    import java.util.concurrent.atomic.AtomicInteger;
    ​
    /**
     * @author fmi110
     * @description 消息生产者
     * @date 2021/7/1 15:08
     */
    @Component
    @Slf4j
    public class RabbitProducer {
        @Autowired
        RabbitTemplate rabbitTemplate;
    ​
        /**
         * 1 设置 confirm 回调,消息发送到 exchange 时回调
         * 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调
         * <p>
         * correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用
         */
        @PostConstruct
        public void enableConfirmCallback() {
            // #1
            /**
             * 连接不上 exchange或exchange不存在时回调
             */
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (!ack) {
                    log.error("消息发送失败");
                    // TODO 记录日志,发送通知等逻辑
                }
            });
    ​
            // #2
            /**
             * 消息投递到队列失败时,才会回调该方法
             * message:发送的消息
             * exchange:消息发往的交换器的名称
             * routingKey:消息携带的路由关键字信息
             */
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.error("{},exchange={},routingKey={}",replyText,exchange,routingKey);
                // TODO 路由失败后续处理逻辑
            });
        }
    ​
        public void sendDelayMsg(String delay) {
            int               delayInt          = StringUtils.isEmpty(delay) ? 0 : Integer.valueOf(delay);
            String            exchangeName      = "delay-exchange";
            String            routingKey        = "rabbit.delay";
            SimpleDateFormat  dateFormat        = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String            msg               = "message send at " + dateFormat.format(new Date()) + ", expired at " + dateFormat.format(new Date().getTime() + delayInt * 1000);
    //        MessageProperties messageProperties = new MessageProperties();
    //        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息
    //        messageProperties.setDelay(delayInt * 1000);
    //        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
    //        rabbitTemplate.send(exchangeName,routingKey,message);
    ​
            rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, message ->{
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化
                message.getMessageProperties().setDelay(delayInt * 1000);   // 单位为毫秒
                return message;
            });
    ​
        }
    }

    3 RabbitConsumer

    消费者,指定监听对应的消息队列即可。

    package com.fmi110.rabbitmq;
    ​
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    ​
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.atomic.AtomicInteger;
    ​
    ​
    /**
     * @author fmi110
     * @description 消息消费者
     * @date 2021/7/1 16:08
     */
    @Component
    @Slf4j
    public class RabbitConsumer {
    ​
        @RabbitListener(queues="delay-queue")
        public void consumeDelay(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
            log.info(">>>> {} 延迟队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);
        }
    }

    4 controller

    package com.fmi110.rabbitmq.controller;
    ​
    ​
    import com.fmi110.rabbitmq.RabbitProducer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    import sun.rmi.runtime.Log;
    ​
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    @Slf4j
    @RestController
    public class TestController {
        @Autowired
        RabbitProducer rabbitProducer;
    ​
        @GetMapping("/delay")
        public Object delay(String delay) {
            rabbitProducer.sendDelayMsg(delay); // 发送消息
            HashMap<String, Object> result = new HashMap<>();
            result.put("code", 0);
            result.put("msg", "success");
            return result;
        }
    }

    5 依赖

        
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    ​
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    ​
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>compile</scope>
            </dependency>

    6 运行截图

    展开全文
  • 一、概念 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列 ...

    目录

    一、概念

    二、延迟队列使用场景

    三、RabbitMQ 中的 TTL

    消息设置 TTL

    队列设置 TTL

     区别

    RabbitMQ实现延时队列方法

    四、代码实现

    原理图

    1.pom依赖

    2.yml配置文件

    3.配置文件类代码 

    4.生产者代码

    5.消费者代码

     五、延时队列优化

    1.原理图

    2.配置文件代码 

     3.生产者代码

    4.消费者代码

    5.发起请求

    六、Rabbitmq 插件实现延迟队列,解决延时队列优化遇到的问题

    1.安装延时队列插件

    2.原理图 

    3.配置类代码

    4.生产者代码

    5.消费者代码

    6.发起请求

    七、总结 


    一、概念

    延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
    在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

    二、延迟队列使用场景

    1.订单在十分钟之内未支付则自动取消
    2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
    3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
    4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
    5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

    三、RabbitMQ 中的 TTL

    TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

    一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

    消息设置 TTL

    队列设置 TTL

     区别

    如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)。

    而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

    RabbitMQ实现延时队列方法

    TTL 则能让消息在延迟设定时间之后成为死信,,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

    四、代码实现

    原理图

    1.pom依赖

    <!--RabbitMQ 依赖 -->
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--RabbitMQ 测试依赖 -->
    <dependency>
       <groupId>org.springframework.amqp</groupId>
       <artifactId>spring-rabbit-test</artifactId>
       <scope>test</scope>
    </dependency>

    2.yml配置文件

    spring:
      rabbitmq:
        host: 192.168.16.106
        port: 5672
        username: guest
        password: guest

    3.配置文件类代码 

    @Configuration
    public class TtlQueueConfig {
        /**
         * 普通交换机
         */
        public static final String X_EXCHANGE = "X";
        /**
         * 普通队列
         */
        public static final String QUEUE_A = "QA";
        public static final String QUEUE_B = "QB";
        /**
         * 死信交换机
         */
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
        /**
         * 死信队列
         */
        public static final String DEAD_LETTER_QUEUE = "QD";
    
        @Bean("xExchange")
        public DirectExchange xExchange(){
            return new DirectExchange(X_EXCHANGE);
        }
    
        @Bean("yExchange")
        public DirectExchange yExchange(){
            return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
        }
    
        @Bean("queueA")
        public Queue queueA(){
            HashMap<String, Object> arguments = new HashMap<>();
            //设置死信交换机
            arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            arguments.put("x-dead-letter-routing-key","YD");
            //设置TTL 单位是ms
            arguments.put("x-message-ttl",10000);
            return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
        }
    
        @Bean("queueB")
        public Queue queueB(){
            HashMap<String, Object> arguments = new HashMap<>();
            //设置死信交换机
            arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            arguments.put("x-dead-letter-routing-key","YD");
            //设置TTL 单位是ms
            arguments.put("x-message-ttl",40000);
            return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
        }
    
        @Bean("queueD")
        public Queue queueD(){
            return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
        }
    
        //绑定不需要调用,bean就不用起名字
        //@Qualifier:按名称注入
        @Bean
        public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
        }
        @Bean
        public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueB).to(xExchange).with("XB");
        }
    
        @Bean
        public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                     @Qualifier("yExchange") DirectExchange yExchange){
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    }
    

    4.生产者代码

    @Slf4j
    @RestController
    @RequestMapping("/ttl")
    public class SendMsgController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //开始发消息
        @GetMapping("/sendMsg/{message}")
        public void sendMsg(@PathVariable String message){
            log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
            rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
            rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
        }
    }

    5.消费者代码

    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {
    
        //接收消息
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel) throws Exception{
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
        }
    }
    

    发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
    第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。


    不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

     五、延时队列优化

    1.原理图

    在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

    2.配置文件代码 

    @Configuration
    public class TtlQueueConfig {
        /**
         * 普通交换机
         */
        public static final String X_EXCHANGE = "X";
        /**
         * 普通队列
         */
        public static final String QUEUE_C = "QC";
        /**
         * 死信交换机
         */
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
        /**
         * 死信队列
         */
        public static final String DEAD_LETTER_QUEUE = "QD";
    
        @Bean("xExchange")
        public DirectExchange xExchange(){
            return new DirectExchange(X_EXCHANGE);
        }
    
        @Bean("yExchange")
        public DirectExchange yExchange(){
            return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
        }
    
        @Bean("queueC")
        public Queue queueC(){
            HashMap<String, Object> arguments = new HashMap<>();
            //设置死信交换机
            arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey
            arguments.put("x-dead-letter-routing-key","YD");
            return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
        }
    
    
        @Bean("queueD")
        public Queue queueD(){
            return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
        }
    
        //绑定不需要调用,bean就不用起名字
        //@Qualifier:按名称注入
        @Bean
        public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                      @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueC).to(xExchange).with("XC");
        }
    
        @Bean
        public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                     @Qualifier("yExchange") DirectExchange yExchange){
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    }
    

     3.生产者代码

    @Slf4j
    @RestController
    @RequestMapping("/ttl")
    public class SendMsgController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //开始发消息 消息TTL
        @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
        public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
            log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);
            rabbitTemplate.convertAndSend("X","XC",message,msg ->{
                //设置过期时间
                msg.getMessageProperties().setExpiration(ttlTime);
                return msg;
            });
        }
    }

    4.消费者代码

    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {
    
        //接收消息
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel) throws Exception{
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
        }
    }
    

    5.发起请求

    http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
    http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

    如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。 

    六、Rabbitmq 插件实现延迟队列,解决延时队列优化遇到的问题

    1.安装延时队列插件

    在官网上下载 https://www.rabbitmq.com/community-plugins.html,下载
    rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
    进入 RabbitMQ 的安装目录下的 plgins 目录,

    执行以下指令:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    重启RabbitMQ生效

     

    2.原理图 

    在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

    在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
    不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。 

    3.配置类代码

     正常我们申明一个Exchange只需要指定其类型(direct,fanout,topic等)即可,而声明延迟Exchange需要指定type为x-delayed-message,并通过参数x-delay-type指定其Exchange的类型(direct,fanout,topic等)

    @Configuration
    public class DelayedQueueConfig {
    
        public static final String DELAYED_QUEUE_NAME = "delayed.queue";
        public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
        public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
    
        @Bean
        public Queue delayedQueue(){
            return new Queue(DELAYED_QUEUE_NAME);
        }
        @Bean
        public CustomExchange delayedExchange(){
    
            HashMap<String, Object> arguments = new HashMap<>();
            // 自定义交换机的类型
            arguments.put("x-delayed-type","direct");
            /**
             * 1.交换机名称
             * 2.交换机类型
             * 3.是否需要持久化
             * 4.是否需要自动删除
             * 5.其他的参数
             */
            return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
                    true,false,arguments);
        }
    
        @Bean
        public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                          @Qualifier("delayedExchange") CustomExchange delayedExchange
        ){
            return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    

    4.生产者代码

    @Slf4j
    @RestController
    @RequestMapping("/ttl")
    public class SendMsgController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //开始发消息 基于插件
        @GetMapping("/sendDelayMsg/{message}/{delayTime}")
        public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
            log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列:{}",new Date().toString(),delayTime,message);
            rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
                    DelayedQueueConfig.DELAYED_ROUTING_KEY,message, messagePostProcessor ->{
                //设置延迟时间  单位:
                    messagePostProcessor.getMessageProperties().setDelay(delayTime);
                return messagePostProcessor;
            });
        }
    }

    5.消费者代码

    @Slf4j
    @Component
    public class DelayQueueConsumer {
    
        @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
        public void receiveDelayQueue(Message message){
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
        }
    }

    6.发起请求

    http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
    http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

    第二个消息被先消费掉了,符合预期

    七、总结 

    延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用
    RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

    当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

    展开全文
  • RabbitMQ 之 延迟队列篇

    2022-01-08 13:45:59
    1 延迟队列概念 延时队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。...
  • //队列发送 邮件 延迟 delayTime amqpTemplate.convertAndSend("delayed.exchange.user.register", "delayed.key.user.register",uname,msg->{ msg.getMessageProperties().setDelay(delayTime); return msg; }...
  • rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME , DelayQueueConfig.DELAYED_ROUTING_KEY, msg, message -> { //设置发送消息的延迟时间 message.getMessageProperties().setDelay...
  • rabbitmq事务和重试机制

    千次阅读 2021-12-13 17:29:25
    = delay) { properties.setDelay(delay); //设置延迟的时间 } if (!StringUtils.isEmpty(contentType)) { properties.setContentType(contentType); } if (!StringUtils.isEmpty(correlationId)) { properties....
  • RabbitMQ的延时队列

    2021-03-16 13:38:19
    } } 我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。 等同于我们手动设置 header: message.getMessageProperties().setHeader("x-delay", "6000"); /** * Set the x-delay header. * @...
  • 不是过期时间 message -> { /* 设置消息的属性,设置延迟*/ message.getMessageProperties().setDelay(delay); return message; }); /* 执行到这说明没问题,返回添加数据库返回值 */ return result; } } config下的...
  • /* 重新投递消息 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));...
  • //进行消息推送 延迟时长,单位ms rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> { msg.getMessageProperties().setDelay...
  • Unity插件-DoTween

    2021-02-21 13:00:15
    可加入入参 LoopType 设定循环方式 LoopType类型 描述 Yoyo 正播之后倒播 Incremental 重播时不重置位置 Restart 重播时重置位置 SetDelay 设置等待时间 SetAutoKill 是否自动回收 SetId 设置ID DOTween.Pause(ID);...
  • RabbitMQ-延迟队列

    2020-05-13 18:43:36
    Integer delay) { rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_EXCHANGE_ROUTING_KEY,message,msgProcessor->{ msgProcessor.getMessageProperties().setDelay(delay); return msgProcessor; }); } /...
  • 可知setDelay的参数类型为Integer */ @Value("${rabbitmq.timeOutDelayTime}") private Integer timeoutDelayTime; private final RabbitTemplate rabbitTemplate; private final OrderService orderService; @...
  • i ++) { encoder.setDelay(decoder.getDelay(i)); BufferedImage childImage = decoder.getFrame(i); BufferedImage image = childImage.getSubimage(x, y, width, height); encoder.addFrame(image); } encoder....
  • [Unity插件]DOTween基础

    万次阅读 2016-03-14 16:38:29
    transform.DOMoveX(45, 1).SetDelay(2).SetEase(Ease.OutQuad).OnComplete(MyCallback); 好了,正式开始! 一些名词: Tweener :补间动画 Sequence :相当于一个Tweener的链表,可以通过执行一个Sequence来执行一串...
  • RabbitMQ延时队列

    2021-08-23 16:23:50
    // 设置延时时长,单位为毫秒 messageProperties.setDelay(1 * 60 * 1000); Message message = new Message(msg.getBytes(),messageProperties); System.out.println("topicSendMsg2{}"+ new ...
  • gloablConfig.setDelay(10000); gloablConfig.start(); while (true) { try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public ...
  • rabbitmq delay 延时消息实现两种方式

    千次阅读 2021-01-05 14:10:20
    rabbitmq 延时消息实现 TTL(Time To Live) 发送TLL消息到死信队列,死信队列不配置消费者,死信队列将消息转发到另一个立即消费的队列上,配置好立即消费的消费者处理延时消息。 安装rabbitmq 插件 rabbitmq_...
  • rabbitmq 延时队列踩坑记

    千次阅读 2021-10-26 13:59:13
    message.getMessageProperties().setDelay(next.intValue()); // 毫秒为单位,指定此消息的延时时长 return message; }) 插件方式是设置一个int类型的时间毫秒数,经测试当时间过长例如一个月后。计算出来的时间差...
  • 浅谈延迟队列任务

    2020-07-25 11:35:22
    this.rabbitTemplate.convertAndSend(XdelayConfig.DELAYED_EXCHANGE_XDELAY, XdelayConfig.DELAY_ROUTING_KEY_XDELAY, msg, message -> { message.getMessageProperties().setDelay(delayTime); System.out....
  • message.getMessageProperties().setDelay(30 * (60*1000)); // 毫秒为单位,指定此消息的延时时长 return message; }); 消费端监听延时队列 package cn.rongyuan.mq.consumer; import java.io.IOException; import...
  • //msg.getMessageProperties().setDelay(120); return msg; } }); return "success"; } } 接收消息 @Component public class ReceiveMessage { @RabbitListener(queues = "MQ.LazyQueue") @RabbitHandler public ...
  • 转载出处:...Sensor 整体框架: Sensor app开发一般会包含五步:【后面有详细流程介绍】 【1】. 获取sensor manager对象; mSensorManager = (SensorManager)getSyst...
  • setDelay(sensor, sampling period) flush(sensor) poll() 调用顺序 sensors_module_t sensors_poll_device_t/sensors_poll_device_1_t sensor_t sensors_event_t   **元数据刷写完成事件** 说明 :Sensor 系列...
  • delay 参数将转换为带符号的32位整数,这有效地将延迟限制为 2147483647 ms(约 24.8 天) 2147483647 === Math.pow(2, 31) - 1 === parseInt('01111111111111111111111111111111', 2) 在nodejs和浏览器中执行的...
  • // 发送消息时,设置消息为延迟消息 delayMessage.getMessageProperties().setDelay(10000); log.info("发送延迟消息{},时间为{}", new String(delayMessage.getBody()), LocalDateTime.now().toString()); ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,446
精华内容 1,378
关键字:

setdelay

友情链接: 案例15.rar