精华内容
下载资源
问答
  • 主要介绍了SpringBoot使用RabbitMQ延时队列(小白必备),详细的介绍延迟队列的使用场景及其如何使用,需要的小伙伴可以一起来了解一下
  • springboot整合RabbitMQ实现延时队列的两种方式 教程及源码。参考博客:https://blog.csdn.net/qq_29914837/article/details/94070677
  • 该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...
  • springboot+rabbitmq实现延时队列,包括消息发送和消费确认,消费者端使用策略模式处理业务
  • rabbitmq延时队列和四种交换机模式下队列的简单实现,需要自己配置一下属性文件。
  • redis 延时队列

    2018-08-10 17:13:11
    将整个Redis当做消息池,以kv形式存储消息 使用ZSET做优先队列,按照score维持优先级
  • 由redis支持的优先级队列队列,为eggjs构建。
  • 延时队列

    千次阅读 2019-06-13 16:58:24
    延时队列,顾名思义是带有延时功能的消息队列,列举几个使用场景: 定时发公告 用户下单30分钟后未付款自动关闭订单 用户下单后延时短信提醒 延时关闭空闲客户端连接 ...... 延时队列实现方案: Java中的...

    延时队列,顾名思义是带有延时功能的消息队列,列举几个使用场景:

    1. 定时发公告
    2. 用户下单30分钟后未付款自动关闭订单
    3. 用户下单后延时短信提醒
    4. 延时关闭空闲客户端连接
    5. ......

    延时队列实现方案:

    • Java中的DelayQueue

    DelayQueue是一个无界阻塞队列,只有消息到期才能从中获取到消息。话不多说,实现一个Demo。

    消息实体:

    public class Message implements Delayed {
    
        private String body;
    
        private long delayTime;
    
        private long putTime;
    
        /**
         * 用于返回剩余时间
         * 消息是否到期则是通过此方法判断
         * 返回小于等于0则到期
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(putTime + delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        /**
         * 因为Delayed继承于Comparable
         * 所以需要实现compareTo方法,用于排序
         * 该对象(this)小于、等于或大于指定对象(o),则分别返回负整数、零或正整数。
         */
        @Override
        public int compareTo(Delayed o) {
            long result = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            if (result < 0) {
                return -1;
            } else if (result > 0) {
                return 1;
            } else {
                return 0;
            }
        }
    
        // getter/setter
    
        public Message(String body, long delayTime) {
            this.body = body;
            this.delayTime = delayTime;
            this.putTime = new Date().getTime();
        }
    }
    

    消费者线程:

    public class Consumer implements Runnable {
    
        // 延时队列
        private DelayQueue<Message> queue;
    
        public Consumer(DelayQueue<Message> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Message message = queue.take();
                    System.out.println("接收到消息内容:" + message.getBody());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        // getter/setter
    }
    

    生产者:

    public class Producer {
    
        public static void main(String[] args) {
            DelayQueue<Message> queue = new DelayQueue<>();
    
            // 延时5秒
            Message m1 = new Message("兄dei吃饭了吗?", 5000);
    
            // 延时15秒
            Message m2 = new Message("开黑吗???我玩亚索!", 15000);
    
            // 将延时消息放到延时队列
            queue.add(m1);
            queue.offer(m2);
    
            new Thread(new Consumer(queue)).start();
    
        }
    }
    

    成功运行,等待5秒,15秒,输出消息。

     

    要点:

    1.add方法跟offer方法其实是一样的

    2.需要始终保证最快到期的消息在队首(关键在于消息实体的compareTo方法),否则会导致到期消息无法及时消费

    查看DelayQueue.take()源码可以发现,它会获取队首消息并判断是否到期,队首消息到期才返回结果;
    队首消息没到期:
    • 如果有其他线程在等待(leader != null),则阻塞当前线程
    • 如果当前没有其他线程等待(leader == null),则阻塞当前线程直到延迟时间

     

    • Redis实现的DelayQueue

    使用redis提供的有序数据结构zset,把过期时间戳作为score。

    当然已经有现成的轮子了------Redisson的RDelayedQueue + RBlockingDeque

    原理就是使用redis的zset + list,先来个Demo。

    消息实体:

    public class Order {
    
        private String createdTime;
    
        public Order() {
            this.createdTime = new SimpleDateFormat("hh:mm:ss").format(new Date());
        }
    
        public String getCreatedTime() {
            return createdTime;
        }
    }
    

    生产者:

    public class Producer {
    
        public static void main(String[] args) {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://192.168.3.183:6379");
            RedissonClient redissonClient = Redisson.create(config);
            RBlockingDeque<Order> blockingDeque = redissonClient.getBlockingDeque("delay_queue");
            RDelayedQueue<Order> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
    
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Order order = new Order();
                delayedQueue.offer(order, 1, TimeUnit.SECONDS);
                System.out.println("成功发送延时队列");
            }
    
            delayedQueue.destroy();
        }
    }
    

    消费者:

    public class Consumer {
    
        public static void main(String[] args) {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://192.168.3.183:6379");
            RedissonClient redissonClient = Redisson.create(config);
            RBlockingDeque<Order> blockingDeque = redissonClient.getBlockingDeque("delay_queue");
    
            while (true) {
                Order order = null;
                try {
                    // 如果没有到期消息,返回null
                    order = blockingDeque.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                if (order != null)
                    System.out.println("订单取消时间:" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "==订单生成时间" + order.getCreatedTime());
                else
                    continue;
            }
    
        }
    }
    

    分别执行生产者/消费者。

    Redisson底层其实就是在执行Lua脚本,源码解析参考

    https://www.jianshu.com/p/8fa478da3b00

     

    以上代码仓库

    https://gitee.com/Deep_feel/study_notes/tree/master/test/src/main/java/delayQueue

    展开全文
  • 主要介绍了基于golang的简单分布式延时队列服务的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • RabbitMQ实现延时队列

    2021-03-10 16:55:54
    RabbitMQ实现延时队列 前言 我们在设计大型的购物类的网站的时候,有这样的一个场景就是我们下订单后如果长时间不去支付的话需要超时将订单取消,并且存库要恢复,这就是我们经常说的订单过期会库存。 分布式秒杀...

    RabbitMQ实现延时队列

    前言

    我们在设计大型的购物类的网站的时候,有这样的一个场景就是我们下订单后如果长时间不去支付的话需要超时将订单取消,并且存库要恢复,这就是我们经常说的订单过期会库存。

    分布式秒杀系统的设计可参考我的博客:分布式秒杀系统的设计

    SpringBoot项目中使用RabbitMQ可参考我的博客:SpringBoot项目中使用RabbitMQ

    正文

    延时队列

    订单过期库存回库的场景:

    • 淘宝七天自动确认收货:在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家。
    • 12306 购票支付确认页面:我们在选好票再完成支付之前内部都是有倒计时的,如果 30 分钟内订单不确认支付的话将会自动取消订单。

    解决订单过期库存回库的潜在方案:

    • 使用后台线程不断扫描数据库,性能极低,弃用
    • 将订单数据存入Redis中并设置失效时间,考虑到要保留订单信息,弃用
    • 使用delayedQueue延时队列,设置到期时间,其中的对象只能在到期时才能从队列中取走,进行过期操作,但是不支持分布式,弃用
    • 使用分布式定时任务去处理相关订单,但是时间间隔难以设置,并且数据量巨大,慎用
    • 使用RabbitMQ延迟队列来实现,并使用定时任务处理可能的消息丢失导致的库存无法释放,采用

    RabbitMQ延时队列

    RabbitMQ延时队列的实现

    • RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列。
    • RabbitMQ 3.6.x 开始,RabbitMQ官方提供了延迟队列的插件,可以下载放置到RabbitMQ根目录下的plugins下。

    以RabbitMQ 3.6.6为例安装延时插件

    第一步:下载RabbitMQ延时队列的插件

    第二步:将放置对应的plugins的目录下

    cd /rabbitmq/lib/rabbitmq_server-3.6.6/plugins
    

    第三步:启动插件

    whereis rabbitmq
    cd /usr/local/rabbitmq/bin
    sh rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    在这里插入图片描述

    第四步:重起RabbitMQ服务器使插件生效

    service rabbitmq-server restart
    
    或者
    /usr/local/rabbitmq/bin/rabbitmqctl stop
    /usr/local/rabbitmq/bin/rabbitmq-server -detached
    

    第五步:查看插件是否正常安装

    sh rabbitmq-plugins list
    

    在这里插入图片描述

    SpringBoot使用RabbitMQ实现延时队列

    依赖

    <!--rabbitmq消息队列-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    application.properties:springboot配置类

    ##配置rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    #自定义参数:并发消费者的初始化值
    spring.rabbitmq.listener.concurrency=10
    #自定义参数:并发消费者的最大值
    spring.rabbitmq.listener.max-concurrency=20
    #自定义参数:每个消费者每次监听时可拉取处理的消息数量
    spring.rabbitmq.listener.prefetch=5
    
    
    #字符串型的消息
    string.queue.name=string.queue.name2
    string.exchange.name =string.exchange.name2
    string.routing.key.name=string.routing.key.name2
    

    RabbitmqConfig.class:rabbitmq配置类

    • directExchange.setDelayed(true):交换器开启支持延迟
    /**
    * rabbitmq配置类
    */
    @Configuration
    public class RabbitmqConfig {
    
        @Autowired
        private Environment env;
    
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
    
        @Autowired
        private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    
    
        /**
         * 单一消费者,即为队列消息
         *
         * @return
         */
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainer() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            factory.setConcurrentConsumers(1);
            factory.setMaxConcurrentConsumers(1);
            factory.setPrefetchCount(1);
            factory.setTxSize(1);
            factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
            return factory;
        }
    
        /**
         * RabbitTemplate工具类
         * RabbitTemplate中会使用通道
         * 通道是多路复用的双向数据通道,可以减少TCP连接
         *
         * @return
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            connectionFactory.setPublisherConfirms(true);
            connectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            return rabbitTemplate;
        }
    
    
        /**生成rabbitmq组件
         * 队列:用来存储消息的数据结构,位于硬盘或内存中。
         * 交换机:接收发送到RabbitMQ中的消息并决定把他们投递到那个队列的组件;
         * 绑定:一套规则,用于告诉交换器消息应该被存储到哪个队列。
         *
         * **/
    
    
        /**
         * 字符型消息
         **/
        //定义队列
        @Bean(name = "stringQueue")
        public Queue stringQueue() {
            return new Queue(env.getProperty("string.queue.name"), true);
        }
    
    
        //定义交换机 DirectExchange为队列消息交换器
        @Bean
        public DirectExchange stringExchange() {
            DirectExchange directExchange =new DirectExchange(env.getProperty("string.exchange.name"), true, false);
            directExchange.setDelayed(true);
            return directExchange;
        }
    
    
        //定义绑定
        @Bean
        public Binding stringBinging() {
            return BindingBuilder.bind(stringQueue()).to(stringExchange()).with(env.getProperty("string.routing.key.name"));
        }
    }
    

    CommonMqListener.class:RabbitMQ消费者

    /**
    * 这里是消息队列的消费者
    */
    @Component
    @Slf4j
    public class CommonMqListener {
    
        @Autowired
        RedisTemplate redisTemplate;
    
        /**
         * 监听消费消息
         *
         * @param message
         */
        @RabbitListener(queues = "${string.queue.name}", containerFactory = "singleListenerContainer")
        public void consumeStringQueue(@Payload byte[] message) {
            try {
                log.info("监听消费,当前时间"+ DateTimeUtils.getSystemTime() +" 监听到消息: {} ", new String(message, "UTF-8"));
            } catch (Exception e) {
                log.error("监听消费订单消息消息异常{},{}",e.getStackTrace(), e);
            }
        }
    
    }
    

    AppForTest.class:测试类

    • message.getMessageProperties().setDelay(60000):设置1分钟的延迟,默认时间单位为毫秒
    @RunWith(SpringJUnit4ClassRunner.class)
    //启动Spring
    @SpringBootTest(classes = App.class)
    public class AppForTest {
    
       @Autowired
       private RabbitTemplate rabbitTemplate;
    
       @Autowired
       private Environment env;
    
       @Test
        public void ceshi(){
          rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
          rabbitTemplate.setExchange(env.getProperty("string.exchange.name"));
          rabbitTemplate.setRoutingKey(env.getProperty("string.routing.key.name"));
          try {
             String msg="发送信息:当前时间"+ DateTimeUtils.getSystemTime();
             Message message = MessageBuilder.withBody(msg.getBytes("UTF-8")).build();
             //设置请求编码格式
             message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);
            //设置60秒种的延时
             message.getMessageProperties().setDelay(60000);
             rabbitTemplate.convertAndSend(message);
          } catch (Exception e) {
             throw new BusinessException(CouponTypeEnum.OPERATE_ERROR,"字符串型消息生产者发送消息失败:" + e.getMessage());
          }
       }
    
    }
    

    最终演示结果:

    • 生产者发送消息后,一分钟后被消费者接收并消费

    在这里插入图片描述

    在这里插入图片描述

    展开全文
  • 延时队列的几种实现方式

    千次阅读 2020-07-10 16:57:56
    延时队列的几种实现方式 何为延迟队列? 顾名思义,首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。 延时队列能做什么? 延时队列多用于需要延时...

    延时队列的几种实现方式

    何为延迟队列?

    顾名思义,首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

    延时队列能做什么?

    延时队列多用于需要延时工作的场景。最常见的是以下场景:

    延迟消费,比如:

    1 ,订单成功后,在 30 分钟内没有支付,自动取消订单

    2 ,如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

    3 ,支付成功后, 2 秒后查询支付结果

    4 , ……

    如何实现?

    实现延时队列的方式有很多种,本文主要介绍以下几种常见的方式:

    1. 基于 DelayQueue 实现的本地延时队列。
    2. 基于 RabbitMQ 死信队列实现的延时队列。
    3. 基于 RabbitMQ 插件实现的延时队列。
    

    1. 基于 DelayQueue 实现的本地延时队列

    一个最简单的解决方案就是使用 JDK Java.util.concurrent 包下 DelayQueue。(实际上,如无必要,我们应该尽可能使用 jdk 自带的一些类库,而非重复造轮子,或者过度设计)。

    DelayQueue 是一个 BlockingQueue (无界阻塞)队列,它本质就是封装了一个 PriorityQueue (优先级队列),并加上了延时功能。可以这么说,DelayQueue 就是一个使用优先队列(PriorityQueue)实现的 BlockingQueue,优先队列的比较基准值是时间。即:

    DelayQueue = BlockingQueue + PriorityQueue + Delayed

    从继承层次上看:

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> 
    

    DelayQueue 是 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay (TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。

    要实现 DelayQueue 延时队列,队中元素要 implements Delayed 接口,这个接口里只有一个 getDelay 方法,用于设置延期时间。Delayed 由实现了 Comparable 接口, compareTo 方法负责对队列中的元素进行排序。

    下面看一个demo:

    首先定义一个 我们自定义的 Delayed 实现:

    public class DelayMessage<T extends Runnable> implements Delayed {
    
        private static final int MINUS_ONE = -1;
    
        private final long time;
        private final T task;
    
        /**
         * @param timeout 毫秒
         * @param t       T extends Runnable
         */
        public DelayMessage(long timeout, T t) {
            this.time = System.nanoTime() + timeout;
            this.task = t;
        }
    
        /**
         * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
    
        @Override
        public int compareTo(Delayed delayed) {
            // 过期时间长的放置在队列尾部
            if (this.getDelay(TimeUnit.MICROSECONDS) > getDelay(TimeUnit.MICROSECONDS)) {
                return 1;
            }
            // 过期时间短的放置在队列头
            if (this.getDelay(TimeUnit.MICROSECONDS) < getDelay(TimeUnit.MICROSECONDS)) {
                return -1;
            }
            return 0;
    
        }
    
        public T getTask() {
            return this.task;
        }
    
        @Override
        public int hashCode() {
            return task.hashCode();
        }
    
        @Override
        public boolean equals(Object obj) {
            return task.equals(obj);
        }
    
    }
    

    可以看到,我们自定义的 DelayMessage 里面的元素是一个 Runnable 对象,这是为了我们方便把延时队列中的对象丢到线程池里面去执行。

    接下来,再使用 DelayQueue + 线程池 实现一个工具类,用来从 DelayQueue 中取出 Runnable 对象, 放到线程池去执行:

    @Component
    public class DealyQueueManager {
    
        /**
         * 可缓冲的线程池
         */
        private ExecutorService executor;
        /**
         * 延时队列
         */
        private DelayQueue<DelayMessage<?>> delayQueue;
    
        /**
         * 初始化
         */
        @PostConstruct
        @SuppressWarnings({"PMD.AvoidManuallyCreateThreadRule", "PMD.ThreadPoolCreationRule"})
        public void init() {
            executor = newCachedThreadPool();
            delayQueue = new DelayQueue<>();
    
            //后台线程,监听延时队列
            Thread daemonThread = new Thread(this::execute);
            daemonThread.setName("本地延时队列-DelayQueueMonitor");
            daemonThread.start();
        }
    
        private void execute() {
            while (true) {
                try {
                    // 从延时队列中获取任务,如果队列为空, take 方法将会阻塞在这里
                    DelayMessage<? extends Runnable> delayMessage = delayQueue.take();
                    Runnable task = delayMessage.getTask();
                    if (null == task) {
                        continue;
                    }
                    // 提交到线程池执行 task
                    executor.execute(task);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 添加任务
         *
         * @param task 待延迟执行的任务
         * @param time 延时时间
         * @param unit 时间单位
         */
        public void put(Runnable task, long time, TimeUnit unit) {
            // 获取延时时间
            long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
            // 将任务封装成实现 Delayed 接口的消息体
            DelayMessage<? extends Runnable> delayMessage = new DelayMessage<>(timeout, task);
            // 将消息体放到延时队列中
            delayQueue.put(delayMessage);
        }
    
        /**
         * 删除任务
         */
        public boolean removeTask(Runnable task) {
            return delayQueue.remove(task);
        }
    
    }
    
    

    这样,我们就基于 DelayQueue 实现了一个高效的本地延时队列, 但是缺点就是 在多节点实例部署时,不能同步消息,同步消费,也不能持久化。因此我们可以考虑使用 RabbitMQ 实现的延时队列解决这些问题。

    2. 基于 RabbitMQ 死信队列实现的延时队列

    使用 RabbitMQ 实现延时队列主要用到了它的两个特性:一个是 Time-To-Live Extensions(TTL),另一个是 Dead Letter Exchanges(DLX)。

    Time-To-Live Extensions

    RabbitMQ 允许我们为消息或者队列设置 TTL(time to live),也就是过期时间。TTL 表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了 TTL 或者当某条消息进入了设置了 TTL 的队列时,这条消息会在经过 TTL 秒后 “死亡”,成为 Dead Letter。如果既配置了消息的 TTL,又配置了队列的 TTL,那么较小的那个值会被取用。

    Dead Letter Exchange

    刚才提到了,被设置了 TTL 的消息在过期后会成为 Dead Letter。其实在 RabbitMQ 中,一共有三种消息的 “死亡” 形式:

    1. 消息被拒绝。通过调用 basic.reject 或者 basic.nack 并且设置的 requeue 参数为 false。
    2. 消息因为设置了 TTL 而过期。
    3. 消息进入了一条已经达到最大长度的队列。

    如果队列设置了 Dead Letter Exchange(DLX),那么这些 Dead Letter 就会被重新 publish 到 Dead Letter Exchange,通过 Dead Letter Exchange 路由到其他队列。

    聪明的你肯定已经想到了,如何将 RabbitMQ 的 TTL 和 DLX 特性结合在一起,实现一个延迟队列。

    如上图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过 RabbitMQ 提供的 TTL 扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的 DLX 转发到实际消费队列(图中蓝色队列),以此达到延时消费的效果。

    Demo 示例:

    首先我们需要先准备好交换机和队列:

    交换机(Exchange)队列(Queue)绑定 key队列属性
    my.dead.exchangemy.dead.queuemy.dead.keyx-dead-letter-exchange: my.msg.exchange
    x-dead-letter-routing-key: my.msg.key
    my…msg.exchangemy.msg.queuemy.msg.key

    我们先通过上面的表格定义添加好交换机和队列,首先 定义两个交换机,两个队列, 并分别绑定,注意我们在创建 队列: my.dead.queue 时需要添加两个属性:x-dead-letter-exchange:my.msg.exchangex-dead-letter-routing-key: my.msg.key, 这样,我们就只需要往队列 my.dead.queue 发送消息并设置过期时间, 等到 队列my.dead.queue 中的消息过期时,就会被转发到和交换机 my..msg.exchange 绑定的 key 为 my.msg.key 的队列 my.msg.queue 中,因此,我们只需要监听: my.msg.queue 队列就能收到 队列 my.dead.queue 中的延迟消息了。

    下面代码以 SpringBoot + rabbitmq 为例:

    发布消息:

    @Component
    @Slf4j
    public class DeadDelayMessagePublisher<T> {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * @param payload 消息体
         * @param delay   延迟时间:单位 毫秒(ms)
         */
        public void sendDelay(T payload, long delay) {
            MQMessage message = new MQMessage<T>(payload);
            rabbitTemplate.convertAndSend("my.dead.exchange", "my.dead.key", message, process -> {
                // 设置消息过期时间 单位: ms
                process.getMessageProperties().setExpiration(String.valueOf(delay));
                return process;
            }, new CorrelationData(message.getId()));
    
        }
    }
    

    监听消息:

    @Component
    @Slf4j
    public class DelayedListener {
    
        @RabbitListener(queues = "my.msg.queue")
        public void receiveMessage(MQMessage<PosMessage> message{
          // 在此处理收到延时消息的逻辑
        }
        
    }
    

    这种实现方式其实是有一个弊端的,加入我们有两个消息一前一后进入 队列 my.dead.queue,前面的消息过期时间为 1 分钟, 后面的消息过期时间为 30 秒, 那以这种方式实现的延时队列, 是必须要等到 1分钟的消息消费完后才能轮到 30 秒那个消息。

    为解决这个问题,我们可以使用下面这种方式:

    3. 基于 RabbitMQ 插件实现的延时队列

    这里使用的是一个 RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange,目前维护在 RabbitMQ 插件社区,我们可以声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。

    实现原理:

    上面使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia

    这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =<ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

    插件安装

    根据你的 RabbitMQ 版本来安装相应插件版本,RabbitMQ community-plugins 上面有版本对应信息可参考。

    注意:需要 RabbitMQ 3.5.3 和更高版本。

    启用插件

    使用 rabbitmq-plugins enable 命令启用插件,启动成功会看到如下提示:

    $ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    The following plugins have been enabled:
      rabbitmq_delayed_message_exchange
    
    Applying plugin configuration to rabbit@xxxxxxxx... started 1 plugin.
    

    管理控制台声明 x-delayed-message 交换机

    在开始代码之前先打开 RabbitMQ 的管理 UI 界面,声明一个 x-delayed-message 类型的交换机,否则你会遇到下面的错误:

    Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type"
    

    详情可见 Github Issues rabbitmq-delayed-message-exchange/issues/19,正确操作如下图所示:

    按照如下表格在 rabbitmq-admin 建好插件形式的交换机和队列,并填写正确的属性

    交换机type队列key交换机属性
    my-delayed-exchangex-delayed-meassagemy-dealyed-queuemy-delayed-keyx-delayed-type:direct

    代码示例和普通的交换机队列使用基本一致。

    @Component
    public class DelayMessagePublisher<T> {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * @param exchange mq exahange
         * @param key      bindKey
         * @param payload  消息体
         * @param delay    延迟时间 单位: ms
         */
        public void sendDelay(T payload, long delay) {
            MQMessage message = new MQMessage<T>(payload);
            rabbitTemplate.convertAndSend("my-delayed-exchange", "my-delayed-key", message, process -> {
                // 插件形式设置消息延迟发送时间
                process.getMessageProperties().setDelay((int) delay);
                return process;
            }, new CorrelationData(message.getId()));
        }
    }
    

    监听消息:

    @Component
    @Slf4j
    public class MQDelayedListener {
    
        @RabbitListener(queues = "my-dealyed-queue")
        public void receiveMessage(MQMessage<T> message{
          // 在此处理收到延时消息的逻辑
        }
        
    }
    

    基于这种插件的形式,我们可以实现,过期消息立刻处理,以弥补死信队列的不足之处。

    还有一些其它方法也可以实现延时队列,比如使用 redis 的 sortedset ,还有一些比较复杂的延时队列的算法实现的,比如: 时间轮 。Kafka、Netty 都有基于时间轮算法实现延时队列。这些就不再介绍,感兴趣的可以去网上了解一下,有很多文章讲解的很不错。

    对比 :

    使用难度多实例过期消息能都立刻处理
    DelayQueueJDK 自带,集成方便不支持
    RabbitMQ 死信队列依赖 RabbitMQ 中间件,集成略微复杂支持否(FIFO)
    RabbitMQ 插件依赖 RabbitMQ 中间件并且需要安装插件,集成复杂支持

    总结:

    之所以写这篇文章,是因为项目中有个需求,支付完成后,需要延时获取支付结果,因此需要用到延时队列,由于一些列原因,进行了一些技术变动,开始项目使用的是:JDK 自带的 DelayQueue 实现,后来为了支持多实例,又采用了 RabbitMQ 延时插件 实现,再后来测试环境延时插件收不到消息,也不是很稳定,又改为了 RabbitMQ 死信队列 的方式实现延时队列。

    我从中总结到的经验就是:

    1. 前期的技术选型要考虑充分,频繁变更技术细节的事情应当避免或减少发生。
    2. 用好依赖倒置原则,也就是用接口隔离底层实现,屏蔽掉底层细节的变更。在这次项目中,一开始就使用依赖倒置原则屏蔽了 延时队列的具体实现, 所以虽然变更了三次技术细节,但是改动起来还是很顺利的,变更的代码不影响上层业务逻辑。
    展开全文
  • 延时队列的设计与实现

    千次阅读 2020-09-12 14:51:12
    君生我未生,我生君已老,延时该如何实现,本文介绍和总结了几种延时队列的实现原理,并加入了实现demo,希望大家留言探讨

    延时队列在业务系统中很常用,常见的付费逻辑中,订单生成后的计时,当订单超时后关闭订单,库存归位等,再比如定时逻辑,在一段时间后后者某个时间点触发某些动作,本文将详细讲述几种延时队列的实现逻辑。

    • JUC中java原生的延时队列DelayQueue
    • 时间轮TimeWheel算法
    • redis实现延时队列
    • 消息队列pulsar的延时应用
    • RabbitMQ延迟消息

    1.1 DelayQueue

    说到延时队列DelayQueue,首先应该说到PriorityQueue,在DelayQueue中,包含的元素实际存储在PriorityQueue中,PriorityQueue是一个误解阻塞队列,元素存储于数组中,但实际存储逻辑是一个平衡二叉树,数组的第一个元素是根据比较器得到的最小的元素。在DelayQueue中,要求每个元素必须实现Delayed接口,该接口返回延时时长,同时继承了Comparable接口,需要实现比较逻辑,通过该比较逻辑进行比较排序,使得每次取得的元素为最早过期的元素或者空。

    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }
    
    

    1.2 DelayQueue实践

    首先创建延时元素,实现Delayed接口,在该实现中,定义一个任务的创建序号和延时时长

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class DelayTask implements Delayed {
        long dealAt;
        int index;
        public DelayTask(long time, int ix) {
            dealAt = time;
            index = ix;
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(dealAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed o) {
            if (getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else {
                return -1;
            }
        }
    }
    
    

    在主方法中,创建4个任务,第一个任务延时15s,第二个任务延时10s,第三个任务延时5s,第四个任务延时20s,最后查看执行顺序。

    public class Main {
        public static void main(String[] args) throws InterruptedException {
            DelayQueue<DelayTask> tasks = new DelayQueue<>();
            long cur = System.currentTimeMillis();
            long[] delay = {15000L, 10000L, 5000L, 20000L};
            for (int i = 0; i < delay.length; i++) {
                DelayTask task = new DelayTask(delay[i] + cur, i);
                tasks.add(task);
            }
            while (!tasks.isEmpty()) {
                DelayTask peek = tasks.take();
                if (peek != null) {
                    tasks.poll();
                    System.out.println("时间间隔:"+ (System.currentTimeMillis() - cur) + "ms");
                    System.out.println("当前任务序号:" + peek.index);
                    cur = System.currentTimeMillis();
                }
                Thread.sleep(1000);
            }
        }
    }
    
    

    结果如下

    时间间隔:5000ms
    当前任务序号:2
    时间间隔:5000ms
    当前任务序号:1
    时间间隔:5002ms
    当前任务序号:0
    时间间隔:4997ms
    当前任务序号:3
    
    Process finished with exit code 0
    

    2.1 时间轮TimeWheel算法

    想象这样一种情况,业务中要求提交定时任务,一种实现是提交任务后启动一个定时线程,轮训检测该任务,当任务量变的庞大的时候,这种开销是可怕的,令一种实现是将所有任务有些组织,只用一个线程就可以控制所有的定时任务,时间轮算法就是这种实现,netty、kafka、zookeeper中都使用了该算法实现延时处理。
    时间轮
    时间轮如图所示,图中0~8代表时间周期,每个时间节点代表一个小的时间间隔,假设时间间隔为1分钟,则图中每个时间节点中将包含在这一分钟内的所有定时任务,存储为一个双向的linkedList,且图中的时钟周期为9分钟。
    如图,当当前时间到达时间节点2时,时间节点1中的任务已经全部过期且处理完成,时间节点2对应的定时任务开始过期,开始处理节点2中对应的任务列表。
    时间轮算法有其对应的优缺点,优点是我们可以使用一个线程监控很多的定时任务,缺点是时间粒度由节点间隔确定,所有的任务的时间间隔需要以同样的粒度定义,比如时间间隔是1小时,则我们定义定时任务的单位就为小时,无法精确到分钟和秒。
    时间轮所能容纳的定时任务的时间是有限制的,由时间轮的周期决定,当超过了时间轮的时间周期,我们的定时任务该如何处理,有以下处理方式:

    1. 时间轮复用,比如当前时间为2,时间节点0和1的任务已过期,在此时来了一个任务,要求延时为9,此时2+9=11,对应的位置载1处,因此可以直接把任务放在1的链表,等下个时间周期继续处理。
    2. 将任务按照时间周期分组存放,一个时间周期结束后取出下一个时间周期的任务放入时间轮进行处理。

    2.2 时间轮算法实践

    我们需要如下接口(我们默认时间粒度为毫秒)
    TimerTask定义具体的任务逻辑

    public interface TimerTask {
        void run() throws Exception;
    }
    

    Timeout是对任务的管理逻辑

    public interface Timeout {
        Timer timer();
        TimerTask task();
        boolean isExpired();
    }
    

    Timer管理所有的定时任务,也就是时间轮的要实现的主要方法

    public interface Timer {
        Timeout newTimeOut(TimerTask task, long delay, String argv);
    }
    

    接下来进行对应的实现

    任务实体,简单的输出任务序号

    import java.time.LocalDateTime;
    
    public class MyTask implements TimerTask {
        int index;
        MyTask(int ix) {
            index = ix;
        }
        @Override
        public void run() throws Exception {
            System.out.println(LocalDateTime.now());
            System.out.println("当前任务序号是:" + index);
        }
    }
    

    对任务进行管理的Timeout实体

    
    public class MyTimeout implements Timeout {
        Timer timer;
        TimerTask timerTask;
        String argv;
        long delay;
        int state;
        long round;
    
        public MyTimeout(Timer timer, TimerTask task, long delay, String argv) {
            this.timer = timer;
            this.timerTask = task;
            this.delay = delay;
            this.argv = argv;
            state = 0;
        }
        @Override
        public Timer timer() {
            return timer;
        }
    
        @Override
        public TimerTask task() {
            return timerTask;
        }
    
        @Override
        public boolean isExpired() {
            return state != 0;
        }
    }
    
    

    时间轮的实现,保留对新加任务的添加,时间轮的关闭,以及定时任务线程worker的实现逻辑

    package com.example.test.simpleTimeWheel;
    
    import org.springframework.util.CollectionUtils;
    
    import java.time.Instant;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class TimeWheel implements Timer {
        long basicTime;
        List<Timeout> timeoutList;
        CountDownLatch countDownLatch;
        Worker woker = new Worker();
        int wheelState = 1; // 0 运行 1 停止
        int curIndex;
        int size;
        int mask;
        long durationOfSlot;
        LinkedList<Timeout>[] slots;
        ArrayList<TimerTask> res = new ArrayList<>();
    
        TimeWheel(int size, long durationOfSlot) throws Exception{
            if (size < 0 || size > Integer.MAX_VALUE) {
                throw new Exception("size out of range");
            }
            this.size = size;
            slots = new LinkedList[size];
            this.durationOfSlot = durationOfSlot;
            mask = size - 1;
            curIndex = 0;
            basicTime = 0;
            timeoutList = new ArrayList<>();
            countDownLatch = new CountDownLatch(1);
        }
    
        @Override
        public Timeout newTimeOut(TimerTask task, long delay, String argv) {
            if(delay <= 0) {
                try {
                    task.run();
                } catch (Exception e) {
                    System.out.println("delay is less than zero, just run");
                }
            }
            if (wheelState == 1) {
                wheelState = 0;
                Thread thread = new Thread(woker);
                thread.start();
    
            }
            startTimeWheel();
            Timeout timeout = new MyTimeout(this, task, delay, argv);
            timeoutList.add(timeout);
            return timeout;
        }
    
        private void startTimeWheel() {
            while (basicTime == 0) {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    System.out.println("countdown await exception");
                }
            }
        }
    
        private class Worker implements Runnable {
            @Override
            public void run() {
                if (basicTime == 0) {
                    System.out.println("时间轮启动......");
                    basicTime = Instant.now().toEpochMilli();
                    countDownLatch.countDown();
                }
                do {
                    long deadline = durationOfSlot * (curIndex + 1);
                    for (;;) {
                        //System.out.println(basicTime+"basc");
                        long duration = Instant.now().toEpochMilli() - basicTime;
                        //System.out.println(deadline +"   " + duration);
                        if (duration > deadline) {
                            //时间到
                            process(timeoutList);
                            LinkedList<Timeout> tasks = slots[curIndex];
                            process(tasks);
                            break;
                        }
                    }
                    if (curIndex == mask) {
                        basicTime = System.currentTimeMillis();
                    }
                    curIndex = (++curIndex) % size;
                    checkStop();
                } while (wheelState == 0);
            }
    
            private void checkStop() {
                if (!CollectionUtils.isEmpty(timeoutList)) {
                    return;
                }
                for (LinkedList l : slots) {
                    if (!CollectionUtils.isEmpty(l)) {
                        return;
                    }
                }
                System.out.println("没有定时任务,结束");
                wheelState = 1;
            }
    
            private void process(List<Timeout> timeoutList) {
                //System.out.println("处理新加入的任务");
                for (Timeout out : timeoutList) {
                    MyTimeout mo = (MyTimeout) out;
                    if (mo.isExpired()) {
                        continue;
                    }
                    long needskipslots = mo.delay / durationOfSlot;
                    mo.round = (needskipslots - curIndex) / size;
                    int index = (int)(needskipslots) % size;
                    int i = (index + curIndex) % size;
                    LinkedList<Timeout> slot = slots[i];
                    if (slot == null) {
                        slot = new LinkedList<>();
                    }
                    slot.add(mo);
                    slots[(index + curIndex) % size] = slot;
                }
                timeoutList.clear();
            }
    
            void process(LinkedList<Timeout> outs) {
                //System.out.println("处理到期任务");
                if (CollectionUtils.isEmpty(outs)) {
                    return;
                }
                Iterator<Timeout> iterator = outs.iterator();
                while (iterator.hasNext()) {
                    Timeout next = iterator.next();
                    try {
                        MyTimeout mo = (MyTimeout) next;
                        if (mo.round > 0) {
                            mo.round --;
                            continue;
                        }
                        mo.task().run();
                        res.add(mo.task());
                        iterator.remove();
                        System.out.println("-************************************");
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                    }
                }
    
            }
        }
    
    
    }
    
    

    Main()方法

    package com.example.test.simpleTimeWheel;
    
    public class TimeMain {
        public static void main(String[] args) throws Exception {
            TimeWheel timeWheel = new TimeWheel(10, 1000);
            long delayTime[] = {9000, 3000, 5000, 1000, 25000, 12000};
            for (int i = 0; i < delayTime.length; i++) {
                MyTask task = new MyTask(i);
                timeWheel.newTimeOut(task, delayTime[i], "");
            }
        }
    }
    
    

    生成6个任务,根据延时时长(单位:毫秒),根据代码可知,我们的执行顺序将会是:3-1-2-0-5-4

    具体看结果,验证了我们的想法。

    Connected to the target VM, address: '127.0.0.1:5120', transport: 'socket'
    时间轮启动......
    2020-09-12T11:18:22.440
    当前任务序号是:3
    -************************************
    2020-09-12T11:18:24.386
    当前任务序号是:1
    -************************************
    2020-09-12T11:18:26.386
    当前任务序号是:2
    -************************************
    2020-09-12T11:18:30.386
    当前任务序号是:0
    -************************************
    2020-09-12T11:18:33.387
    当前任务序号是:5
    -************************************
    2020-09-12T11:18:46.388
    当前任务序号是:4
    -************************************
    没有定时任务,结束
    Disconnected from the target VM, address: '127.0.0.1:5120', transport: 'socket'
    

    3.1 Redis实现延时队列

    为了保证每次消费的都是最早到期的数据,我们选择redis的zset数据结构进行存储,zset的每个值包含value和score,我们可以用当前时间+延时时长获取任务需要执行的时间点的时间戳作为score,保证每次拿到最早到期的数据。

    加入数据使用命令如下:

    ZADD KEY_NAME SCORE1 VALUE1.. SCOREN VALUEN
    

    读取使用命令如下:

    ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
    

    min可以设置为0,max为当前时间戳,这样每次获取到的结果为到目前为止过期的所有任务。

    3.2 Redis延时列Demo

    引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>
    

    加入配置

    # Redis服务器地址
    spring.redis.host=127.0.0.1
    # Redis服务器连接端口
    spring.redis.port=6379  
    # 链接超时时间 单位 ms(毫秒)
    spring.redis.timeout=3000
    ################ Redis 线程池设置 ##############
    # 连接池最大连接数(使用负值表示没有限制) 默认 8
    spring.redis.lettuce.pool.max-active=8
    # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
    spring.redis.lettuce.pool.max-wait=-1
    # 连接池中的最大空闲连接 默认 8
    spring.redis.lettuce.pool.max-idle=8
    # 连接池中的最小空闲连接 默认 0
    spring.redis.lettuce.pool.min-idle=0
    

    配置客户端

    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setConnectionFactory(connectionFactory);
        return redisTemplate;
    }
    

    给redis加入任务

    @Service
    public class RedisDelay {
        @Autowired
        RedisTemplate<String, Serializable> redisTemplate;
    
        public static final String KEY = "delaymq";
    
        @PostConstruct
        public void init() {
            RedisDelayInfoData r1 = new RedisDelayInfoData(1, "第1个任务序号");
            redisTemplate.opsForZSet().add(KEY, r1, System.currentTimeMillis() + 5000);
            RedisDelayInfoData r2 = new RedisDelayInfoData(2, "第2个任务序号");
            redisTemplate.opsForZSet().add(KEY, r2, System.currentTimeMillis() + 2000);
            RedisDelayInfoData r3 = new RedisDelayInfoData(3, "第3个任务序号");
            redisTemplate.opsForZSet().add(KEY, r3, System.currentTimeMillis() + 3000);
            RedisDelayInfoData r4 = new RedisDelayInfoData(4, "第4个任务序号");
            redisTemplate.opsForZSet().add(KEY, r4, System.currentTimeMillis() + 5000);
            RedisDelayInfoData r5 = new RedisDelayInfoData(5, "第5个任务序号");
            redisTemplate.opsForZSet().add(KEY, r5, System.currentTimeMillis() + 10000);
            RedisDelayInfoData r6 = new RedisDelayInfoData(6, "第6个任务序号");
            redisTemplate.opsForZSet().add(KEY, r6, System.currentTimeMillis() + 7000);
        }
    
    }
    

    如上,如果我们设置定时1秒查询redis,消费顺序应该为2-3-1/4-6-5

    设置消费线程,用定时任务实现

    @Scheduled(cron = "0/1 * * * * *")
        public void consumer() {
            long cur = System.currentTimeMillis();
            Set<Serializable> range = redisTemplate.opsForZSet().rangeByScore(RedisDelay.KEY, 0, cur);
            for (Serializable re : range) {
                RedisDelayInfoData delayInfoData = (RedisDelayInfoData)re;
                System.out.println(String.format("消费任务: %s, %s", delayInfoData.getMessage(), LocalDateTime.now()));
                redisTemplate.opsForZSet().remove(RedisDelay.KEY, re);
            }
        }
    

    最后结果验证如下:

    消费任务: 第2个任务序号, 2020-09-12T12:55:05.028
    消费任务: 第3个任务序号, 2020-09-12T12:55:06.002
    消费任务: 第1个任务序号, 2020-09-12T12:55:08.002
    消费任务: 第4个任务序号, 2020-09-12T12:55:08.003
    消费任务: 第6个任务序号, 2020-09-12T12:55:10.002
    消费任务: 第5个任务序号, 2020-09-12T12:55:13.005
    

    顺序符合预期,时间间隔也正确。

    4.1 消息队列pulsar的延时

    pulsar消息队列在v2.4.0开始支持延迟消息,但只有在消费者模式为shared的方式下才可生效,具体模式的区别详见官网,此处不做详解。
    为说明实现原理,需要列出一个重要的类

    org.apache.pulsar.broker.delayed.DelayedDeliveryTracker
    

    该类在内存维持一个包含延迟消息的优先级队列,在延时时间到期后将消息放入指定队列,在使用时需要注意内存使用情况。(https://streamnative.io/blog/release/2019-07-09-apache-pulsar-240)
    具体实现源码如下所示:

    当收到需要持久化的消息后,如果为延迟消息,进行如下处理,将消息加入优先级队列

    public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
        if (!topic.isDelayedDeliveryEnabled()) {
            // If broker has the feature disabled, always deliver messages immediately
            return false;
        }
    
        synchronized (this) {
            if (!delayedDeliveryTracker.isPresent()) {
                // Initialize the tracker the first time we need to use it
                delayedDeliveryTracker = Optional
                        .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
            }
    
            delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
            return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
        }
    }
    

    addMessage实现如下

    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
        long now = clock.millis();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
                    deliveryAt - now);
        }
        if (deliveryAt < (now + tickTimeMillis)) {
            // It's already about time to deliver this message. We add the buffer of
            // `tickTimeMillis` because messages can be extracted from the tracker
            // slightly before the expiration time. We don't want the messages to
            // go back into the delay tracker (for a brief amount of time) when we're
            // trying to dispatch to the consumer.
            return false;
        }
    
        priorityQueue.add(deliveryAt, ledgerId, entryId);
        updateTimer();
        return true;
    }
    

    取出消息,从优先级队列拿出

    public Set<PositionImpl> getScheduledMessages(int maxMessages) {
        int n = maxMessages;
        Set<PositionImpl> positions = new TreeSet<>();
        long now = clock.millis();
        // Pick all the messages that will be ready within the tick time period.
        // This is to avoid keeping rescheduling the timer for each message at
        // very short delay
        long cutoffTime = now + tickTimeMillis;
    
        while (n > 0 && !priorityQueue.isEmpty()) {
            long timestamp = priorityQueue.peekN1();
            if (timestamp > cutoffTime) {
                break;
            }
    
            long ledgerId = priorityQueue.peekN2();
            long entryId = priorityQueue.peekN3();
            positions.add(new PositionImpl(ledgerId, entryId));
    
            priorityQueue.pop();
            --n;
        }
    
        if (log.isDebugEnabled()) {
            log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size());
        }
        updateTimer();
        return positions;
    }
    

    4.2 pulsar优先级队列使用

    可以指定延时时长,也可以指定一个具体的消费时间。

    producer.newMessage()
        .deliverAfter(3L, TimeUnit.Minute)
        .value(“Hello Pulsar after 3 minutes!”)
        .send();
    producer.newMessage()
        .deliverAt(new Date(2019, 06, 27, 23, 00, 00).getTime())
        .value(“Hello Pulsar at 11pm on 06/27/2019!”)
        .send();
    

    5 RabbitMQ消息队列实现延迟消息原理

    RabbitMQ是大多数人都知道的一款支持延迟消息的中间件,这里简要说明一下该消息中间件实现延迟消息的原理。

    • TTL:RabbitMQ允许用户对消息或者队列设置TTL(time to live),当设置队列的过期时间后,队列中所有的消息会有相同的过期时间,如果设置了消息的过期时间,只对单一消息有效,如果一个消息设置了过期时间,并发送到了一个设置了过期时间的队列,则对应的过期时间取两者的较小值。
    • DLX: 全称Dead Letter Exchanges,死信交换机,何为死信,有如下几种情况:1.消费者否定了该消息 2. 消息由于设置TTL到期 3.消息队列超过了长度限制被丢弃。 当消息成为死信之后,可以通过死信交换机转发到一个死信队列。
      在这里插入图片描述

    在使用RabbitMQ延迟队列功能时,我们给消息设置TTL,当消息过期成为死信之后,转发至死信队列,消费者消费死信队列的消息。

    各个方案的优缺点

    1. 使用JUC中的延迟队列
      • 优点:简单,无需引入任何的中间件即可完成
      • 缺点:可靠性低,需要自己实现持久化逻辑,内存占用问题
    2. redis实现延迟队列
      • 优点:轻量级,可以持久化,可以使用分布式部署的redis
      • 缺点:消费延迟由轮训速度决定,当消息过多会影响其他功能对redis的使用
    3. 时间轮算法实现延迟队列
      • 优点:相对于每个任务开启定时,时间轮创造性的用一个线程解决了所有问题
      • 缺点:时间轮只是一种算法,其他的问题,比如三高(高可用、高性能、高并发)中的高可用和高性能需要自己实现
    4. 消息中间件延时队列
      • 优点:使用pulsar保证了消息的安全,可以比较容易的实现精确一次消费、至少一次消费、最多一次消费的需求,使用简单
      • 缺点:可能有内存问题,而且比较重,RabbitMQ的需要进行额外配置,添加死信消息队列和DLX。但如果有中间件的基础设施,推荐中间件。
    展开全文
  • Redis实现延时队列有两种实现方式: key失效监听回调 zset分数存时间戳 三、方案选择 key失效监听存在两个问题: Redis的pubsub不会被持久化,服务器宕机就会被丢弃 没有高级特性,没有ack机制,可靠性不高 zset...
  • 所以,为了实现这个功能,决定采用延时队列。那么,如何实现一个延时队列呢?我去各大博客进行了技术调研,整理了一下几种方法。供大家参考,也希望大家评论区提出更多的方法 技术方案 基于redis的延时队列 基于...
  • 这里写目录标题基于kafka的延时队列实现延时队列什么是延时队列使用场景相关消息中间件的延时队列实现rabbitmq基于kafka的简单延时队列消费实现为什么要基于kafka来做延时队列demo效果 延时队列 延时队列顾名思义...
  • 延时队列在许多业务场景中都有着广泛的运用。但可惜的是在RabbitMQ中并未提供延迟队列功能。这里小七结合工作所用,列出2种实现方式。 (1)使用TTL+死信队列组合实现延迟队列的效果。 (2)使用RabbitMQ官方延迟...
  • rabbitmq本身是没有实现延时队列的,但是我们可以通过死信队列或延时插件来实现延时队列。 死信队列 什么是死信? 如果在队列中的消息没有消费者消费,那么该消息就会成为一个死信。如果这个消息被发送到另外一个...
  • 说明:rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时。 具体实现:rocketmq发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息...
  • Rabbitmq延时队列与死信队列的使用以及使用场景创建项目引入依赖声明交换机和队列生产者发送消息消费者监听死信队列-----------------↓↓↓↓↓↓↓-----------------使用场景 创建项目引入依赖 1. 创建maven项目 ...
  • 什么是延时队列 延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后...
  • Redisson 延时队列 原理 详解

    千次阅读 2021-03-12 09:25:30
    花了一天研究了下Redisson 的延时队列,RBlockingQueue ,RDelayedQueue 。 网上没一个说清楚的,而且都是说轮询redis的zset,都是错误的! 让我来纠正,如果我有错的也可指出。 Demo用法 public static void ...
  • 6种延时队列的实现方案

    千次阅读 2020-06-22 16:36:48
    延时队列的应用 什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。 延时队列在项目中的应用还是比较多的,尤其像电商...
  • 五一期间原计划是写两篇文章,看一本技术类书籍,结果这五天由于自律性过于差,禁不住各种诱惑,我连电脑都没打开过,计划完美宣告失败。所以在这能看出和大佬之间的差距,人家没白没夜...下边会介绍多种实现延时队列
  • java实现延时队列

    2021-05-14 09:14:25
    延时队列主要应用场景是用户登录后延时推送消息,通知等,一般用mq中间件来弄,下面我来用java实现 一、消息实体类实现Delayed接口 import lombok.Data; import java.util.concurrent.Delayed; import java.util...
  • 一、什么是延时队列 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 二、延时队列应用于什么场景 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行...
  • 在开发中,往往会遇到一些关于延时任务的需求。例如 生成订单30分钟未支付,则自动取消 生成订单60秒后,给用户发短信 对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个...
  • 延时队列概述 通过给一个普通的队列设置一些参数,死信交换机,死信路由,以及队列过期时间,注意,这个队列不能让任何消费者监听,然后我们将消息统一发往这个队列,当这个队列中的消息过期后,就会根据我们指定的...
  • 点击上方“朱小厮的博客”,选择“设为星标”后台回复"书",获取来源:r6d.cn/qE5f在开发中,往往会遇到一些关于延时任务的需求。例如生成订单30分钟未支付,则自动取...
  • } /** * 往该延时队列中放入数据,达到指定时间时处理 * * @param value 数据 * @param deadLine 截止时间戳,单位是毫秒 * @return 是否执行成功 */ public boolean putForDeadLine(T value, long deadLine) { if ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 80,196
精华内容 32,078
关键字:

延时队列