精华内容
下载资源
问答
  • 延迟队列

    2020-12-31 10:25:47
    延迟队列 延迟,也就是等待一定的时间在执行的。目前支持延迟的消息队列有 RabbitMQ,RocketMQ。但是RocketMQ支持的延迟时间并不灵活,延迟时间并不能自定义。在项目中,延迟使用的比较多的。 例如 订单成功后,在...

    延迟队列

    延迟,也就是等待一定的时间在执行的。目前支持延迟的消息队列有 RabbitMQRocketMQ。但是RocketMQ支持的延迟时间并不灵活,延迟时间并不能自定义。在项目中,延迟使用的比较多的。

    • 例如
      • 订单成功后,在30分钟内没有支付,自动取消订单
      • 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
      • 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

    源码:https://github.com/gl-stars/small-study-case/tree/master/nm-demo/classicsCase-demo/src/main/java/com/classics/delay

    一、DelayQueue 延时队列

    • 定义延迟队列
    package com.classics.delay;
    
    import com.fasterxml.jackson.annotation.JsonFormat;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Order implements Delayed {
        /**
         * 延迟时间,单位由 {@link Order#Order(java.lang.String, long, java.util.concurrent.TimeUnit)}定义
         */
        @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
        private long time;
        String name;
    
        /***
         * 添加延迟任务
         * @param name 延迟内容
         * @param time 延迟时间
         * @param unit 单位
         */
        public Order(String name, long time, TimeUnit unit) {
            this.name = name;
            this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }
        @Override
        public int compareTo(Delayed o) {
            Order Order = (Order) o;
            long diff = this.time - Order.time;
            if (diff <= 0) {
                return -1;
            } else {
                return 1;
            }
        }
    }
    
    • 测试
    package com.classics.delay;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.TimeUnit;
    
    public class DelayQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
            // 添加三个定时任务,TimeUnit.SECONDS 表示单位为秒
            Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
            Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
            Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
            DelayQueue<Order> delayQueue = new DelayQueue<>();
            delayQueue.put(Order1);
            delayQueue.put(Order2);
            delayQueue.put(Order3);
    
            /**
             * 这里很重要,这里不断的循环,不断的监测是否有到期的队列
             */
            while (delayQueue.size() != 0) {
                /**
                 * 取队列头部元素是否过期
                 */
                Order task = delayQueue.poll();
                if (task != null) {
                    System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                }
                Thread.sleep(1000);
            }
        }
    }
    

    在这里插入图片描述

    二、Redis实现延迟

    主要是过期时间回退的方式,也就是我们添加一个消息,时间到期了调用一下程序,正好可以实现延迟的效果。

    docker安装Redis参考:https://blog.csdn.net/qq_41853447/article/details/112003274

    docker安装Redis参考:https://blog.csdn.net/qq_41853447/article/details/103201684

    • 配置 redis.conf

    redis.conf配置文件中添加 notify-keyspace-events Ex配置。

    notify-keyspace-events Ex
    

    在这里插入图片描述

    • 引入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    

    我使用的是springboot工程,这个依赖springboot版本控制器里面已经定义好了,所以这里不需要添加版本号。

    • 定义配置类
    package com.classics.delay;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    
    @Configuration
    public class RedisListenerConfig {
    
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
    
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            return container;
        }
    }
    
    package com.classics.delay;
    
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
     
        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }
    
    
        /**
         * 通过Redis处理接收到的对象的回调
         * @param message key值
         * @param pattern
         */
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String expiredKey = message.toString();
            System.out.println("监听到key:" + expiredKey + "已过期");
        }
    }
    
    • 测试

    使用 Another Redis Desktop Manage工具连接Redis

    Another Redis Desktop Manage使用参考:https://blog.csdn.net/qq_41853447/article/details/112003227

    在这里插入图片描述

    在这里插入图片描述

    • 语法
    set key值 数据 ex 过期时间
    
    set key_admin 这是数据 ex 5
    

    在这里插入图片描述

    注意:如果Redis过期回调时出现了网络问题或者其他问题没有调用成功的,Redis是不会再次调用的。

    这样实现延迟队列以后,就没有必要在写一个定时任务,10分钟或者5分钟去检索一下数据库,有没有该处理的数据。那样我觉得不是访问量压垮服务器,就凭这些定时任务对服务器都是一个不可忽略的负担。有了延迟任务之后,预防延迟队列数据的丢失或者其他情况,定时任务可以有,但是这个定时的时间可以选择大一些。

    展开全文
  • 在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。1. 实现一个简单的延迟队列。2...

    在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。

    1. 实现一个简单的延迟队列。

    2.使用Redis的list实现分布式延迟队列。

    3.使用Redis的zSet实现分布式延迟队列。

    4. 总结一下,另外还有哪些可以延迟队列。

    1.  实现一个简单的延迟队列。

    我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图。DelayQueue实现了Delay、BlockingQueue接口。也就是DelayQueue是一种阻塞队列。

    cc8fe7db51866ade30cbe11d6daadfc4.png

    我们在看一下Delay的类图。Delayed接口也实现了Comparable接口,也就是我们使用Delayed的时候需要实现CompareTo方法。因为队列中的数据需要排一下先后,根据我们自己的实现。Delayed接口里边有一个方法就是getDelay方法,用于获取延迟时间,判断是否时间已经到了延迟的时间,如果到了延迟的时间就可以从队列里边获取了。

    904131617c31b643b5303e3c534aacac.png

    我们创建一个Message类,实现了Delayed接口,我们主要把getDelay和compareTo进行实现。在Message的构造方法的地方传入延迟的时间,单位是毫秒,计算好触发时间fireTime。同时按照延迟时间的升序进行排序。我重写了里边的toString方法,用于将Message按照我写的方法进行输出。

    packagecom.hqs.delayQueue.bean;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/***@authorhuangqingshi

    * @Date 2020-04-18*/

    public class Message implementsDelayed {privateString body;private longfireTime;publicString getBody() {returnbody;

    }public longgetFireTime() {returnfireTime;

    }public Message(String body, longdelayTime) {this.body =body;this.fireTime = delayTime +System.currentTimeMillis();

    }public longgetDelay(TimeUnit unit) {return unit.convert(this.fireTime -System.currentTimeMillis(), TimeUnit.MILLISECONDS);

    }public intcompareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));

    }

    @OverridepublicString toString() {return System.currentTimeMillis() + ":" +body;

    }public static void main(String[] args) throwsInterruptedException {

    System.out.println(System.currentTimeMillis()+ ":start");

    BlockingQueue queue = new DelayQueue<>();

    Message message1= new Message("hello", 1000 * 5L);

    Message message2= new Message("world", 1000 * 7L);

    queue.put(message1);

    queue.put(message2);while (queue.size() > 0) {

    System.out.println(queue.take());

    }

    }

    }

    里边的main方法里边声明了两个Message,一个延迟5秒,一个延迟7秒,时间到了之后会将接取出并且打印。输出的结果如下,正是我们所期望的。

    1587218430786:start1587218435789:hello1587218437793:world

    这个方法实现起来真的非常简单。但是缺点也是很明显的,就是数据在内存里边,数据比较容易丢失。那么我们需要采用Redis实现分布式的任务处理。

    2. 使用Redis的list实现分布式延迟队列。

    本地需要安装一个Redis,我自己是使用Docker构建一个Redis,非常快速,命令也没多少。我们直接启动Redis并且暴露6379端口。进入之后直接使用客户端命令即可查看和调试数据。

    docker pull redis

    docker run-itd --name redisLocal -p 6379:6379redis

    docker exec-it redisLocal /bin/bash

    redis-cli

    我本地采用spring-boot的方式连接redis,pom文件列一下,供大家参考。

    4.0.0

    org.springframework.boot

    spring-boot-starter-parent

    2.2.6.RELEASE

    com.hqs

    delayQueue

    0.0.1-SNAPSHOT

    delayQueue

    Demo project for Spring Boot

    1.8

    org.springframework.boot

    spring-boot-starter

    org.springframework.boot

    spring-boot-starter-test

    test

    org.junit.vintage

    junit-vintage-engine

    org.springframework.boot

    spring-boot-starter-data-redis

    org.springframework.boot

    spring-boot-starter-web

    redis.clients

    jedis

    2.9.0

    org.springframework.boot

    spring-boot-devtools

    runtime

    org.projectlombok

    lombok

    true

    org.springframework.boot

    spring-boot-maven-plugin

    加上Redis的配置放到application.properties里边即可实现Redis连接,非常的方便。

    # redis

    redis.host=127.0.0.1

    redis.port=6379

    redis.password=

    redis.maxIdle=100

    redis.maxTotal=300

    redis.maxWait=10000

    redis.testOnBorrow=true

    redis.timeout=100000

    接下来实现一个基于Redis的list数据类型进行实现的一个类。我们使用RedisTemplate操作Redis,这个里边封装好我们所需要的Redis的一些方法,用起来非常方便。这个类允许延迟任务做多有10W个,也是避免数据量过大对Redis造成影响。如果在线上使用的时候也需要考虑延迟任务的多少。太多几百万几千万的时候可能数据量非常大,我们需要计算Redis的空间是否够。这个代码也是非常的简单,一个用于存放需要延迟的消息,采用offer的方法。另外一个是启动一个线程, 如果消息时间到了,那么就将数据lpush到Redis里边。

    packagecom.hqs.delayQueue.cache;importcom.hqs.delayQueue.bean.Message;importlombok.extern.slf4j.Slf4j;importorg.springframework.data.redis.core.RedisTemplate;importjava.util.concurrent.BlockingQueue;/***@authorhuangqingshi

    * @Date 2020-04-18*/@Slf4jpublic classRedisListDelayedQueue{private static final int MAX_SIZE_OF_QUEUE = 100000;private RedisTemplateredisTemplate;privateString queueName;private BlockingQueuedelayedQueue;public RedisListDelayedQueue(RedisTemplate redisTemplate, String queueName, BlockingQueuedelayedQueue) {this.redisTemplate =redisTemplate;this.queueName =queueName;this.delayedQueue =delayedQueue;

    init();

    }public voidofferMessage(Message message) {if(delayedQueue.size() >MAX_SIZE_OF_QUEUE) {throw new IllegalStateException("超过队列要求最大值,请检查");

    }try{

    log.info("offerMessage:" +message);

    delayedQueue.offer(message);

    }catch(Exception e) {

    log.error("offMessage异常", e);

    }

    }public voidinit() {new Thread(() ->{while(true) {try{

    Message message=delayedQueue.take();

    redisTemplate.opsForList().leftPush(queueName, message.toString());

    }catch(InterruptedException e) {

    log.error("取消息错误", e);

    }

    }

    }).start();

    }

    }

    接下来我们看一下,我们写一个测试的controller。大家看一下这个请求/redis/listDelayedQueue的代码位置。我们也是生成了两个消息,然后把消息放到队列里边,另外我们在启动一个线程任务,用于将数据从Redis的list中获取。方法也非常简单。

    packagecom.hqs.delayQueue.controller;importcom.hqs.delayQueue.bean.Message;importcom.hqs.delayQueue.cache.RedisListDelayedQueue;importcom.hqs.delayQueue.cache.RedisZSetDelayedQueue;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Controller;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.ResponseBody;importjava.util.Set;import java.util.concurrent.*;/***@authorhuangqingshi

    * @Date 2020-04-18*/@Slf4j

    @Controllerpublic classDelayQueueController {private static final int CORE_SIZE =Runtime.getRuntime().availableProcessors();//注意RedisTemplate用的String,String,后续所有用到的key和value都是String的

    @Autowired

    RedisTemplateredisTemplate;private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>());

    @GetMapping("/redisTest")

    @ResponseBodypublicString redisTest() {

    redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS);

    System.out.println(redisTemplate.opsForValue().get("a"));return "s";

    }

    @GetMapping("/redis/listDelayedQueue")

    @ResponseBodypublicString listDelayedQueue() {

    Message message1= new Message("hello", 1000 * 5L);

    Message message2= new Message("world", 1000 * 7L);

    String queueName= "list_queue";

    BlockingQueue delayedQueue = new DelayQueue<>();

    RedisListDelayedQueue redisListDelayedQueue= newRedisListDelayedQueue(redisTemplate, queueName, delayedQueue);

    redisListDelayedQueue.offerMessage(message1);

    redisListDelayedQueue.offerMessage(message2);

    asyncListTask(queueName);return "success";

    }

    @GetMapping("/redis/zSetDelayedQueue")

    @ResponseBodypublicString zSetDelayedQueue() {

    Message message1= new Message("hello", 1000 * 5L);

    Message message2= new Message("world", 1000 * 7L);

    String queueName= "zset_queue";

    BlockingQueue delayedQueue = new DelayQueue<>();

    RedisZSetDelayedQueue redisZSetDelayedQueue= newRedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue);

    redisZSetDelayedQueue.offerMessage(message1);

    redisZSetDelayedQueue.offerMessage(message2);

    asyncZSetTask(queueName);return "success";

    }public voidasyncListTask(String queueName) {

    taskExecPool.execute(()->{for(;;) {

    String message=redisTemplate.opsForList().rightPop(queueName);if(message != null) {

    log.info(message);

    }

    }

    });

    }public voidasyncZSetTask(String queueName) {

    taskExecPool.execute(()->{for(;;) {

    Long nowTimeInMs=System.currentTimeMillis();

    System.out.println("nowTimeInMs:" +nowTimeInMs);

    Set messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs);if(messages != null && messages.size() != 0) {

    redisTemplate.opsForZSet().removeRangeByScore(queueName,0, nowTimeInMs);for(String message : messages) {

    log.info("asyncZSetTask:" + message + " " +nowTimeInMs);

    }

    log.info(redisTemplate.opsForZSet().zCard(queueName).toString());

    }try{

    TimeUnit.SECONDS.sleep(1);

    }catch(InterruptedException e) {

    e.printStackTrace();

    }

    }

    });

    }

    }

    我就不把运行结果写出来了,感兴趣的同学自己自行试验。当然这个方法也是从内存中拿出数据,到时间之后放到Redis里边,还是会存在程序启动的时候,任务进行丢失。我们继续看另外一种方法更好的进行这个问题的处理。

    3. 使用Redis的zSet实现分布式延迟队列。

    我们需要再写一个ZSet的队列处理。下边的offerMessage主要是把消息直接放入缓存中。采用Redis的ZSET的zadd方法。zadd(key, value, score) 即将key=value的数据赋予一个score, 放入缓存中。score就是计算出来延迟的毫秒数。

    packagecom.hqs.delayQueue.cache;importcom.hqs.delayQueue.bean.Message;importlombok.extern.slf4j.Slf4j;importorg.springframework.data.redis.core.RedisTemplate;importjava.util.concurrent.BlockingQueue;/***@authorhuangqingshi

    * @Date 2020-04-18*/@Slf4jpublic classRedisZSetDelayedQueue {private static final int MAX_SIZE_OF_QUEUE = 100000;private RedisTemplateredisTemplate;privateString queueName;private BlockingQueuedelayedQueue;public RedisZSetDelayedQueue(RedisTemplate redisTemplate, String queueName, BlockingQueuedelayedQueue) {this.redisTemplate =redisTemplate;this.queueName =queueName;this.delayedQueue =delayedQueue;

    }public voidofferMessage(Message message) {if(delayedQueue.size() >MAX_SIZE_OF_QUEUE) {throw new IllegalStateException("超过队列要求最大值,请检查");

    }long delayTime = message.getFireTime() -System.currentTimeMillis();

    log.info("zset offerMessage" + message +delayTime);

    redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime());

    }

    }

    上边的Controller方法已经写好了测试的方法。/redis/zSetDelayedQueue,里边主要使用ZSet的zRangeByScore(key, min, max)。主要是从score从0,当前时间的毫秒数获取。取出数据后再采用removeRangeByScore,将数据删除。这样数据可以直接写到Redis里边,然后取出数据后直接处理。这种方法比前边的方法稍微好一些,但是实际上还存在一些问题,因为依赖Redis,如果Redis内存不足或者连不上的时候,系统将变得不可用。

    4. 总结一下,另外还有哪些可以延迟队列。

    上面的方法其实还是存在问题的,比如系统重启的时候还是会造成任务的丢失。所以我们在生产上使用的时候,我们还需要将任务保存起来,比如放到数据库和文件存储系统将数据存储起来,这样做到double-check,双重检查,最终达到任务的99.999%能够处理。

    其实还有很多东西可以实现延迟队列。

    1) RabbitMQ就可以实现此功能。这个消息队列可以把数据保存起来并且进行处理。

    2)Kafka也可以实现这个功能。

    3)Netty的HashedWheelTimer也可以实现这个功能。

    有兴趣的同学可以进一步研究这些内容的实现。

    展开全文
  • 参考文章https://www.cnblogs.com/mfrank/p/11260355.htmlhttps://blog.csdn.net/u014308482/article/details/53036770https://blog.csdn.net/u012988901/article/details/88958654本文大纲什么是延迟队列延迟队列也...

    参考文章

    • https://www.cnblogs.com/mfrank/p/11260355.html

    • https://blog.csdn.net/u014308482/article/details/53036770

    • https://blog.csdn.net/u012988901/article/details/88958654

    本文大纲

    49bc5d4401bd37db0048a390982da6b0.png

    什么是延迟队列

    延迟队列也是队列,队列就意味着元素是有序的。元素出队和入队是有方向的,从一端进,从另一端出。延迟队列体现在延迟上面,普通的队列是希望元素能够被更快的消费,而延迟队列是希望元素在指定的时间被消费。所以延迟队列里面的元素是带有时间属性的。

    延迟队列的使用场景

    • 用户订单10分钟内未支付自动取消

    • 预定会议后,开会前10分钟提醒

    • 优惠券到期前提醒

    基础知识补充

    TTL(Time To Live)

    什么是TTL

    TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中所有消息的最大存活时间,单位是毫秒。如果一条设置了TTL属性的消息或者一条消息进入设置了TTL属性的队列后,那么这条消息在设置的时间内没有被消费,则会成为“死信”,如果消息配置了TTL后被投递到设置了TTL属性的队列中,则按照较小的那个值设置。

    如何设置TTL

    如果不设置TTL,则消息永远不会过期。如果TTL=0,则表示除非此时可以直接投递到该消息的消费者,否则这条消息就会被丢弃。

    创建队列时,设置队列的TTL

    Map<String, Object> args = new HashMap<String, Object>();args.put("x-message-ttl", 6000);channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

    设置每条消息的TTL

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.expiration("6000");AMQP.BasicProperties properties = builder.build();channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

    两种TTL的特性

    两种设置方式有很大的区别

    • 如果设置了队列的TTL,那么一旦消息过期,就会被队列丢弃。

    • 针对每条消息设置TTL,即使消息过期,也不会马上丢弃,因为消息是否过期是在即将投递到消费者之前确定的,如果当前队列有严重的消息积压,则已经过期的消息也许还能存活很长时间。

    上述第2点,RabbitMQ只会检查第一个消息是否过期,消息过期还存活的原因是因为队列是有序消费的,而如果需要判断每条消息是否过期则需要遍历整个队列,性能损耗太大,选择在有序消费到该消息时准备投递前进行消息的判断,空间换时间的方案。

    DLX(Dead Letter Exchanges)

    什么是DLX

    DLX的作用就是用来接收死信消息,当一个消息在队列中变成了死信消息后,可以发送到另一个exchange(交换机),这个交换机就是DLX,绑定DLX的队列成为死信队列,当这个队列存在死信消息时,RabbitMQ就会立即将这个消息发布到设置的DLX上去,进而被路由到绑定该DLX的死信队列上。

    什么是死信

    • 消息被拒绝 (Basic.Reject/Basic.Nack),并且设置requeue=false

    • 消息过期(TTL)

    • 队列达到最大长度

    如何设置DLX

    RabbitMQ的Queue可以配置 x-dead-letter-exchangex-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

    • x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

    • x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

    基础知识总结

    结合TTL和DLX两个特性,将消息设置了TTL规则之后当消息在队列中变为dead letter时,利用DLX特性将它转发到另一个Exchange或者Routing Key,这个时候绑定这个死信队列的消费者开始消费消息即可实现延时消费的效果。

    生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingKey将消息路由到不同的延时队列①,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingKey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

    ①:不同消息绑定在不同的队列中很重要,此处使用的是TTL的第一种,为队列设置时长。可以确保队列中消息的过期时间是有序的。因为如果队列中有不同过期时间的消息,会出现消息错乱的情况。比如第一条是10分钟过期,第二条是20秒过期,则必须要等第一条消息有序被消费后(结合TTL过期特性,空间换时间),才能在10分钟20秒后消费到第二条消息。

    实现延迟消息队列

    源码

    配置部分

    /** * 代码: https://www.cnblogs.com/mfrank/p/11260355.html * @author imyzt * @date 2020/08/29 * @description 配置文件 */@Configurationpublic class RabbitMQConfig {    /**     * 延迟交换机     */    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";    /**     * 延迟队列名称     */    private static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";    private static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";    /**     * 延迟队列Routing Key     */    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";    /**     * 死信交换机     */    private static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";    /**     * 死信队列Routing Key     */    private static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";    private static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";    /**     * 死信队列名称     */    public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";    public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";    /**     * 首先声明延迟队列, 生产者通过交换机和Routing Key将消息发送到延迟队列上     * 然后消息变为死信时, 死信交换机将消息转发到死信队列上, 消费者对死信队列进行监听     */    /**     * 声明延迟队列交换机     */    @Bean    public DirectExchange delayExchange() {        return new DirectExchange(DELAY_EXCHANGE_NAME);    }    /**     * 声明死信队列交换机     */    @Bean    public DirectExchange deadLetterExchange() {        return new DirectExchange(DEAD_LETTER_EXCHANGE);    }    /**     * 声明延迟队列A     * 延迟10秒     * 并绑定到对应的死信交换机     */    @Bean    public Queue delayQueueA() {        Map<String, Object> args = new HashMap<>(3);        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);        // x-dead-letter-routing-key  这里声明当前队列的死信路由key        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);        // x-message-ttl  声明队列的TTL        args.put("x-message-ttl", 10000);        return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();    }    /**     * 声明延迟队列B     * 延迟60秒     * 并绑定到对应的死信交换机     */    @Bean    public Queue delayQueueB() {        Map<String, Object> args = new HashMap<>(3);        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);        // x-dead-letter-routing-key  这里声明当前队列的死信路由key        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);        // x-message-ttl  声明队列的TTL        args.put("x-message-ttl", 60000);        return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();    }    /**     * 声明死信队列A, 用于接收延迟10s的消息     */    @Bean    public Queue deadLetterQueueA() {        return new Queue(DEAD_LETTER_QUEUEA_NAME);    }    /**     * 声明死信队列B, 用于接收延迟60s的消息     */    @Bean    public Queue deadLetterQueueB() {        return new Queue(DEAD_LETTER_QUEUEB_NAME);    }    /**     * 声明延迟队列A与延迟队列交换机绑定关系     * Routing Key     */    @Bean    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,                                                                   @Qualifier("delayExchange") Exchange exchange) {        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY).noargs();    }    /**     * 声明延迟队列B与延迟队列交换机绑定关系     * Routing Key     */    @Bean    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,                                                                   @Qualifier("delayExchange") Exchange exchange) {        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY).noargs();    }    /**     * 声明死信队列A与死信队列交换机绑定关系     * Routing Key     */    @Bean    public Binding deadBindingA(@Qualifier("deadLetterQueueA") Queue queue,                                                                   @Qualifier("deadLetterExchange") Exchange exchange) {        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY).noargs();    }    /**     * 声明死信队列B与死信队列交换机绑定关系     * Routing Key     */    @Bean    public Binding deadBindingB(@Qualifier("deadLetterQueueB") Queue queue,                                                                  @Qualifier("deadLetterExchange") Exchange exchange) {        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY).noargs();    }}

    生产者部分

    /** * @author imyzt * @date 2020/08/29 * @description 消息生产者 */@Slf4j@RestController@RequestMapping("sender")public class MessageSenderController {    @Autowired    private RabbitTemplate rabbitTemplate;    @PostMapping    public void sender(String msg, String type) {        log.info("当前时间:{},收到请求,msg:{},delayType:{}", LocalDateTime.now().toString(), msg, type);        switch (type) {            case "DELAY_10S":                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);                break;            case "DELAY_60S":                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);                break;            default:                break;        }    }}

    消费者部分

    /** * @author imyzt * @date 2020/08/29 * @description 死信队列消费者 */@Component@Slf4jpublic class DeadLetterQueueConsumer {    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)    public void receiveA(Message message, Channel channel) throws IOException {        String msg = new String(message.getBody());        log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now().toString(), msg);        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);    }    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)    public void receiveB(Message message, Channel channel) throws IOException {        String msg = new String(message.getBody());        log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now().toString(), msg);        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);    }}

    两个交换机

    a8c9bfce6cc66fe282044271ac50d0cd.png

    四个队列

    f6461735f954d03491deafc96f7d05d1.png

    延迟效果

    67701db4856ebe4724e1a689abff6d69.png

    缺陷

    从上面的效果来看,第一条消息在10秒后变成了死信消息,然后被消费掉。第二条消息在60秒后变成了死信队列,然后被消费掉。目前来看基本功能的延迟队列就算完成了。
    但是有一个问题就是,队列的消息都是有序的失效,如果增加一个新的时间需求,那么有需要增加一个队列处理上面的逻辑,实在是不够优雅。

    延迟队列优化

    通过上面实现的内容,使用RabbitMQ自带的DLX和TTL,实现的结果是无法对灵活过期时间的支持。针对这个问题的最终解决方案是使用rabbit提供的一个延迟插件实现。https://www.rabbitmq.com/community-plugins.html,下载rabbitmqdelayedmessage_exchange插件。

    插件的安装

    进入到RabbitMQ安装目录的bin目录下,执行指令安装插件即可

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    重新实现延迟功能

    配置部分

    /** * @author imyzt * @date 2020/08/29 * @description 延迟插件实现消息延迟 */@Configurationpublic class DelayedRabbitMQConfig {    /**     * 延迟队列     */    public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";    /**     * 延迟交换机     */    public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";    /**     * 延迟队列 Routing Key     */    public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";    /**     * 创建延迟队列     */    @Bean    public Queue immediateQueue() {        return new Queue(DELAYED_QUEUE_NAME);    }    /**     * 创建一个自定义的交换机(插件实现)     */    @Bean    public CustomExchange customExchange() {        Map<String, Object> args = new HashMap<>(1);        args.put("x-delayed-type", "direct");        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);    }    /**     * 绑定交换机和队列     */    @Bean    public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,                                 @Qualifier("customExchange") CustomExchange customExchange) {        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();    }}

    生产者部分

    /** * @author imyzt * @date 2020/08/29 * @description 消息生产者 */@Slf4j@RestController@RequestMapping("sender")public class MessageSenderController {    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * 通过插件实现的延迟消息     * @param msg 消息内容     * @param delayTime 延迟时间, 毫秒     */    @PostMapping("v2")    public void sender(String msg, Integer delayTime) {        log.info("当前时间:{},收到请求,msg:{},delayTime:{}", LocalDateTime.now().toString(), msg, delayTime);        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, messagePostProcessor ->{            messagePostProcessor.getMessageProperties().setDelay(delayTime);            return messagePostProcessor;        });    }}

    消费者部分

    /** * @author imyzt * @date 2020/08/29 * @description 死信队列消费者 */@Component@Slf4jpublic class DeadLetterQueueConsumer {    /**     * 插件延迟队列     * 消费者     */    @RabbitListener(queues = DELAYED_QUEUE_NAME)    public void receiveD(Message message, Channel channel) throws IOException {        String msg = new String(message.getBody());        log.info("当前时间:{},延时队列收到消息:{}", LocalDateTime.now().toString(), msg);        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);    }}

    交换机

    74536ef7dc06010f7cdbb6cb0667e42f.png

    队列

    cc40682f35bb1aa5c11a1b3ba2f3601f.png

    效果图

    486a660f6b2562bac87be4cc94a781bd.png

    总结

    1. TTL对消息设置过期时间,过期时间是无序的。所以不能用于延迟队列不同过期时间的处理。只能针对队列设置时间,一个队列处理一个时间的需求。

    2. 如果需要可靠性延迟队列,推荐使用插件。

    3. rabbitmqdelayedmessage_exchange插件在RAM节点会有一些问题,这个博主描述了这部分https://blog.csdn.net/wangming520liwei/article/details/103352440

    4. 本文大量参考了“参考文章”中的内容,只是对自己学习延迟队列的内容记载。感谢原作者们。

    5. 本文出现的源代码均在https://github.com/imyzt/learning-technology-code/tree/master/mq-series/RabbitMQ/delay-queue-simple

    展开全文
  • 在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。1. 实现一个简单的延迟队列。...

    在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。

    1. 实现一个简单的延迟队列。

    我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图。DelayQueue实现了Delay、BlockingQueue接口。也就是DelayQueue是一种阻塞队列。

    22194527292a7c8cde6f6bd419e2be9b.png

    我们在看一下Delay的类图。Delayed接口也实现了Comparable接口,也就是我们使用Delayed的时候需要实现CompareTo方法。因为队列中的数据需要排一下先后,根据我们自己的实现。Delayed接口里边有一个方法就是getDelay方法,用于获取延迟时间,判断是否时间已经到了延迟的时间,如果到了延迟的时间就可以从队列里边获取了。

    ad6d2d0d9d56f8f8c7ae2131f3db601f.png

    我们创建一个Message类,实现了Delayed接口,我们主要把getDelay和compareTo进行实现。在Message的构造方法的地方传入延迟的时间,单位是毫秒,计算好触发时间fireTime。同时按照延迟时间的升序进行排序。我重写了里边的toString方法,用于将Message按照我写的方法进行输出。

    d589e8f16b9b49911e0dcbae98a4d7fb.png

    5c51ce7f746550e5d89e0a81c012ca2e.png

    里边的main方法里边声明了两个Message,一个延迟5秒,一个延迟7秒,时间到了之后会将接取出并且打印。输出的结果如下,正是我们所期望的。

    cce2b512d0b25a33679ba064ebfec9a2.png

    这个方法实现起来真的非常简单。但是缺点也是很明显的,就是数据在内存里边,数据比较容易丢失。那么我们需要采用Redis实现分布式的任务处理。

    2. 使用Redis的list实现分布式延迟队列。

    本地需要安装一个Redis,我自己是使用Docker构建一个Redis,非常快速,命令也没多少。我们直接启动Redis并且暴露6379端口。进入之后直接使用客户端命令即可查看和调试数据。

    1be52456f9bbd8907fc871522c8a1601.png

    我本地采用spring-boot的方式连接redis,pom文件列一下,供大家参考。

    16e7e48fbdcfbaf22bc47d156862ed3d.png

    fd9734b6b0dc2cbc99ca9297b7bf07b1.png

    455c47cb15dd3b4880ff36827af51e35.png

    加上Redis的配置放到application.properties里边即可实现Redis连接,非常的方便。

    5b4de0a25f6ff6e7578432fb1f07b208.png

    接下来实现一个基于Redis的list数据类型进行实现的一个类。我们使用RedisTemplate操作Redis,这个里边封装好我们所需要的Redis的一些方法,用起来非常方便。这个类允许延迟任务做多有10W个,也是避免数据量过大对Redis造成影响。如果在线上使用的时候也需要考虑延迟任务的多少。太多几百万几千万的时候可能数据量非常大,我们需要计算Redis的空间是否够。这个代码也是非常的简单,一个用于存放需要延迟的消息,采用offer的方法。另外一个是启动一个线程, 如果消息时间到了,那么就将数据lpush到Redis里边。

    59d9b153b524b2be70eca8e601244439.png

    bce265b0f72d907a3af78a30987f1818.png

    接下来我们看一下,我们写一个测试的controller。大家看一下这个请求/redis/listDelayedQueue的代码位置。我们也是生成了两个消息,然后把消息放到队列里边,另外我们在启动一个线程任务,用于将数据从Redis的list中获取。方法也非常简单。

    1340ee35a72c3089f7fde957715ceecf.png

    e0ac5ec7c1d20ed948a09c9f6fba7ec3.png

    3289f6ddb162e970e12fc7aa089aef84.png

    4d270f0740ba3dc22f7ed76f1d7a3447.png

    853c752c1e724fdd6e29efbe3cc6a262.png

    我就不把运行结果写出来了,感兴趣的同学自己自行试验。当然这个方法也是从内存中拿出数据,到时间之后放到Redis里边,还是会存在程序启动的时候,任务进行丢失。我们继续看另外一种方法更好地进行这个问题的处理。

    3. 使用Redis的zSet实现分布式延迟队列。

    我们需要再写一个ZSet的队列处理。下边的offerMessage主要是把消息直接放入缓存中。采用Redis的ZSET的zadd方法。zadd(key, value, score) 即将key=value的数据赋予一个score, 放入缓存中。score就是计算出来延迟的毫秒数。

    d6ee2a99a236438236887e5187985d50.png

    2073cba5ec39cd3ff7fcb86c53dec27f.png

    上边的Controller方法已经写好了测试的方法。/redis/zSetDelayedQueue,里边主要使用ZSet的zRangeByScore(key, min, max)。主要是从score从0,当前时间的毫秒数获取。取出数据后再采用removeRangeByScore,将数据删除。这样数据可以直接写到Redis里边,然后取出数据后直接处理。这种方法比前边的方法稍微好一些,但是实际上还存在一些问题,因为依赖Redis,如果Redis内存不足或者连不上的时候,系统将变得不可用。

    4. 总结一下,另外还有哪些可以延迟队列。

    上面的方法其实还是存在问题的,比如系统重启的时候还是会造成任务的丢失。所以我们在生产上使用的时候,我们还需要将任务保存起来,比如放到数据库和文件存储系统将数据存储起来,这样做到double-check,双重检查,最终达到任务的99.999%能够处理。

    其实还有很多东西可以实现延迟队列。

    1) RabbitMQ就可以实现此功能。这个消息队列可以把数据保存起来并且进行处理。

    2)Kafka也可以实现这个功能。

    3)Netty的HashedWheelTimer也可以实现这个功能。

    有兴趣的同学可以进一步研究这些内容的实现。

    欢迎大家留言评论,做进一步探讨!

    展开全文
  • 1.使用死信队列abbitmq 本身不支持延迟队列,但提供了实现延迟队列的必备条件。原理queue可以通过 x-message-ttl 参数设置过期时间,到了过期时间的消息就会被标记为 dead letter 状态。过期的消息可以通过 x-dead-...
  • Java延迟队列DelayQueue 实现 BlockingQueue 接口。 DelayQueue 中的元素必须保留一定的时间。DelayQueue 使用一个名为 Delayed 的接口来获取延迟时间。该接口在java.util.concurrent包中。 其声明如下:public ...
  • rabbitMQ延迟队列

    2019-03-15 15:55:37
    rabbitmq 延迟队列
  • 这里我们就可以使用延迟队列,我们写好转发方法或者退回方法,用户A分配任务时将时间记录放入延迟队列。当30天后用户B没有处理,我们获取从延迟队列里面获取这个记录,能获取得到,就执行转发方法或退回方法。如果30...
  • RabbitMQ延迟队列

    2019-06-18 15:54:39
    延迟队列
  • 延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行。DLX + TTL 方式存在的时序问题对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有介绍过如何使用 RabbitMQ 死信...
  • 死信队列和延迟队列 通常,在某些情况下,当您有某种工作或作业队列时,有必要不立即处理每个工作项或作业,而是要延迟一些时间。 例如,如果用户单击一个按钮来触发要完成的某项工作,而一秒钟后,用户意识到他/她...
  • 延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么场景下我才需要这样的队列呢?1. 背景我们先看看以下业务场景:当订单一直处于未支付状态时,如何及时的关闭订单如何定期检查处于退款状态的订单...
  • 一、延迟队列延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。二、Redisson Delayed Queue如果你项目中使用了redisson,那么恭喜你,...
  • RabbitMQ 延迟队列

    2021-02-05 14:01:32
    RabbitMQ 延迟队列什么是延迟队列Time To Live(TTL)Dead Letter Exchanges(DLX)延迟队列实现方式一(推荐)原理代码实现延迟队列实现方式二(不推荐)原理缺点 什么是延迟队列 延迟队列存储的对象肯定是对应的延时消息...
  • 延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息。延迟队列的应用场景1、新用户注册,10分钟后发送邮件或站内信。2、用户下单后,30分钟未支付,订单自动作废。我们通过redis的...
  • 延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息。延迟队列的应用场景:1、新用户注册,10分钟后发送邮件或站内信。2、用户下单后,30分钟未支付,订单自动作废。我们通过redis的...
  • 本文将使用redis实现异步队列以及延迟队列,虽然我们在实际开发中经常会有专业的消息队列中间件,如:rabbitmq等,但是如果系统中没有mq中间件,又懒得维护mq中间件,那么我们可以通过redis来实现因为redis并不是专业...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,861
精华内容 2,344
关键字:

延迟队列