精华内容
下载资源
问答
  • 延时队列

    2020-05-19 10:45:42
    延时队列 一、延时队列的应用 什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。 延时队列在项目中的应用还是比较多的...

    延时队列

    一、延时队列的应用

    什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
    延时队列在项目中的应用还是比较多的,尤其像电商类平台:
    1、订单成功后,在30分钟内没有支付,自动取消订单
    2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
    3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
    4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
    。。。。
    上边的这些场景都可以应用延时队列解决。

    二、延时队列的实现

    我个人一直秉承的观点:工作上能用JDK自带API实现的功能,就不要轻易自己重复造轮子,或者引入三方中间件。一方面自己封装很容易出问题(大佬除外),再加上调试验证产生许多不必要的工作量;另一方面一旦接入三方的中间件就会让系统复杂度成倍的增加,维护成本也大大的增加。

    1、DelayQueue 延时队列

    JDK中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue
    DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(不知道的自行了解哈)来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。
    先简单实现一下看看效果,添加三个order入队DelayQueue,分别设置订单在当前时间的5秒10秒15秒后取消。
    在这里插入图片描述
    要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这个接口里只有一个getDelay方法,用于设置延期时间。Order类中compareTo方法负责对队列中的元素进行排序。

    public class Order implements Delayed {
        /**
         * 延迟时间
         */
        @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
        private long time;
        String name;
        
        public Order(String name, long time, TimeUnit unit) {
            this.name = name;
            this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }
        @Override
        public int compareTo(Delayed o) {
            Order Order = (Order) o;
            long diff = this.time - Order.time;
            if (diff <= 0) {
                return -1;
            } else {
                return 1;
            }
        }
    }
    

    DelayQueueput方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法poll()take()poll()为非阻塞获取,没有到期的元素直接返回null;take() 阻塞方式获取,没有到期的元素线程将会等待。

    public class DelayQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
            Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
            Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
            Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
            DelayQueue<Order> delayQueue = new DelayQueue<>();
            delayQueue.put(Order1);
            delayQueue.put(Order2);
            delayQueue.put(Order3);
    
            System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            while (delayQueue.size() != 0) {
                /**
                 * 取队列头部元素是否过期
                 */
                Order task = delayQueue.poll();
                if (task != null) {
                    System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                }
                Thread.sleep(1000);
            }
        }
    }
    

    上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。
    执行后看到结果如下,Order1Order2Order3 分别在 5秒10秒15秒后被执行,至此就用DelayQueue实现了延时队列。

    订单延迟队列开始时间:2020-05-06 14:59:09
    订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
    订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
    订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}
    

    2、Quartz 定时任务

    Quartz一款非常经典任务调度框架,在RedisRabbitMQ还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。定时任务它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。
    引入quartz框架依赖包

    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-quartz</artifactId>
    </dependency>
    

    在启动类中使用@EnableScheduling注解开启定时任务功能。

    @EnableScheduling
    @SpringBootApplication
    public class DelayqueueApplication {
    	public static void main(String[] args) {
    		SpringApplication.run(DelayqueueApplication.class, args);
    	}
    }
    

    编写一个定时任务,每个5秒执行一次。

    @Component
    public class QuartzDemo {
    
        //每隔五秒
        @Scheduled(cron = "0/5 * * * * ? ")
        public void process(){
            System.out.println("我是定时任务!");
        }
    }
    

    3、Redis sorted set
    Redis的数据结构Zset,同样可以实现延迟队列的效果,主要利用它的score属性,redis通过score来为集合中的成员进行从小到大的排序。
    在这里插入图片描述
    通过zadd命令向队列delayqueue 中添加元素,并设置score值表示元素过期的时间;向delayqueue 添加三个order1order2order3,分别是10秒20秒30秒后过期。

    zadd delayqueue 3 order3

    消费端轮询队列delayqueue, 将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key

     /**
         * 消费消息
         */
        public void pollOrderQueue() {
    
            while (true) {
                Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
    
                String value = ((Tuple) set.toArray()[0]).getElement();
                int score = (int) ((Tuple) set.toArray()[0]).getScore();
                
                Calendar cal = Calendar.getInstance();
                int nowSecond = (int) (cal.getTimeInMillis() / 1000);
                if (nowSecond >= score) {
                    jedis.zrem(DELAY_QUEUE, value);
                    System.out.println(sdf.format(new Date()) + " removed key:" + value);
                }
    
                if (jedis.zcard(DELAY_QUEUE) <= 0) {
                    System.out.println(sdf.format(new Date()) + " zset empty ");
                    return;
                }
                Thread.sleep(1000);
            }
        }
    

    我们看到执行结果符合预期

    2020-05-07 13:24:09 add finished.
    2020-05-07 13:24:19 removed key:order1
    2020-05-07 13:24:29 removed key:order2
    2020-05-07 13:24:39 removed key:order3
    2020-05-07 13:24:39 zset empty

    4、Redis 过期回调

    Rediskey过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。

    修改redis.conf文件开启notify-keyspace-events Ex

    notify-keyspace-events Ex

    Redis监听配置,注入Bean RedisMessageListenerContainer

    @Configuration
    public class RedisListenerConfig {
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
    
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            return container;
        }
    }
    

    编写Redis过期回调监听方法,必须继承KeyExpirationEventMessageListener,有点类似于MQ的消息监听。

    @Component
    public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
     
        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String expiredKey = message.toString();
            System.out.println("监听到key:" + expiredKey + "已过期");
        }
    }
    

    到这代码就编写完成,非常的简单,接下来测试一下效果,在redis-cli客户端添加一个key 并给定3s的过期时间。

    set xiaofu 123 ex 3

    在控制台成功监听到了这个过期的key

    监听到过期的key为:xiaofu

    5、RabbitMQ 延时队列

    利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTLDXL这两个属性间接实现的。
    先来认识一下 TTLDXL两个概念:
    Time To Live(TTL)
    TTL顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。
    RabbitMQ可以从两种维度设置消息过期时间,分别是队列和消息本身

    • 设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
    • 设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。

    如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。

    Dead Letter Exchanges(DLX)

    DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQQueue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。

    x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。

    x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。

    队列出现Dead Letter的情况有:

    • 消息或者队列的TTL过期
    • 队列达到最大长度
    • 消息被消费端拒绝(basic.reject or basic.nack)

    下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。

    在这里插入图片描述
    发送消息时指定消息延迟的时间

    public void send(String delayTimes) {
            amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {
                // 设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            });
        }
    }
    

    设置延迟队列出现死信后的转发规则

    	/**
         * 延时队列
         */
        @Bean(name = "order.delay.queue")
        public Queue getMessageQueue() {
            return QueueBuilder
                    .durable(RabbitConstant.DEAD_LETTER_QUEUE)
                    // 配置到期后转发的交换
                    .withArgument("x-dead-letter-exchange", "order.close.exchange")
                    // 配置到期后转发的路由键
                    .withArgument("x-dead-letter-routing-key", "order.close.queue")
                    .build();
        }
    

    6、时间轮

    前边几种延时队列的实现方法相对简单,比较容易理解,时间轮算法就稍微有点抽象了。kafkanetty都有基于时间轮算法实现延时队列,下边主要实践Netty的延时队列讲一下时间轮是什么原理。

    先来看一张时间轮的原理图,解读一下时间轮的几个基本概念
    在这里插入图片描述
    wheel :时间轮,图中的圆盘可以看作是钟表的刻度。比如一圈round 长度为24秒,刻度数为8,那么每一个刻度表示 3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大,精度越大。

    当添加一个定时、延时任务A,假如会延迟25秒后才会执行,可时间轮一圈round 的长度才24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的roundindex信息。当round=0,index=0 ,指针指向0格子 任务A并不会执行,因为 round=0不满足要求。

    所以每一个格子代表的是一些时间,比如1秒25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。

    Netty构建延时队列主要用HashedWheelTimerHashedWheelTimer底层数据结构依然是使用DelayedQueue,只是采用时间轮的算法来实现。

    下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。

    • ThreadFactory :表示用于生成工作线程,一般采用线程池;
    • tickDurationunit:每格的时间间隔,默认100ms;
    • ticksPerWheel:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算。
    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
            this(threadFactory, tickDuration, unit, ticksPerWheel, true);
        }
    
    • TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。
    • Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。
    • Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。
    public class NettyDelayQueue {
    
        public static void main(String[] args) {
    
            final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
    
            //定时任务
            TimerTask task1 = new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    System.out.println("order1  5s 后执行 ");
                    timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
                }
            };
            timer.newTimeout(task1, 5, TimeUnit.SECONDS);
            TimerTask task2 = new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    System.out.println("order2  10s 后执行");
                    timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
                }
            };
    
            timer.newTimeout(task2, 10, TimeUnit.SECONDS);
    
            //延迟任务
            timer.newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    System.out.println("order3  15s 后执行一次");
                }
            }, 15, TimeUnit.SECONDS);
    
        }
    }
    

    从执行的结果看,order3order3延时任务只执行了一次,而order2order1为定时任务,按照不同的周期重复执行。

    order1 5s 后执行
    order2 10s 后执行
    order3 15s 后执行一次
    order1 5s 后执行
    order2 10s 后执行

    最后

    可能写的有不够完善的地方,如哪里有错误或者不明了的,欢迎大家踊跃指正!!!

    展开全文
  • 1、什么是延时队列延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费 2、适用场景 (1)商城订单超时未支付,取消订单 (2)使用权限到期前十分钟提醒用户 (3)收益项目,...

    一、介绍

    • 1、什么是延时队列?
      延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费
    • 2、适用场景
      (1)商城订单超时未支付,取消订单
      (2)使用权限到期前十分钟提醒用户
      (3)收益项目,投入后一段时间后产生收益

    二、实现方式

    从以上场景中,我们可以看出,延时队列的主要功能就是在指定的时间之后做指定的事情,那么,我们思考有哪些工具我们可以使用?

    • 1、Redis 监听过期 Key

    可以参考我的博客【SpringBoot】三十五、SpringBoot整合Redis监听Key过期事件

    https://lizhou.blog.csdn.net/article/details/109238083
    
    • 2、RabbitMQ等实现延时队列

    这也是本片文章中要讲的知识点,使用 RabbitMQ 实现延时队列有两种方式

    (1)利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
    (2)利用 RabbitMQ 中的插件 x-delay-message

    本文主要讲解第二种方式,使用插件的方式

    三、下载插件

    RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列,我们可以从 这里 下载到它

    https://www.rabbitmq.com/community-plugins.html
    

    选择 rabbitmq_delayed_message_exchange 插件,如图所示
    下载插件
    下载插件
    选择 .ez 格式的文件下载,下载后放置 RabbitMQ 的安装目录下的 plugins 目录下,如我的路径为

    D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.16\plugins
    

    执行命令

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    安装插件完成

    四、在SpringBoot整合RabbitMQ

    1、引入 RabbitMQ 依赖

    <!-- rabbitmq消息队列 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    2、配置 RabbitMQ 信息

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        listener:
          simple:
            # 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
            acknowledge-mode: manual
    

    3、RabbitMQ 常量类

    package com.asurplus.common.rabbitmq;
    
    /**
     * rabbit常量类
     *
     * @Author Lizhou
     */
    public final class RabbitConst {
    
        /**
         * 交换机
         */
        public static final String DELAY_EXCHANGE = "delay_exchange";
    
        /**
         * 队列
         */
        public static final String DELAY_QUEUE = "delay_queue";
    
        /**
         * 路由
         */
        public static final String DELAY_KEY = "delay_key";
    
    }
    
    

    4、RabbitMQ 配置类

    package com.asurplus.common.rabbitmq;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * rabbitmq配置类
     *
     * @Author Lizhou
     */
    @Configuration
    public class RabbitConfig {
    
        /**
         * 延时队列交换机
         *
         * @return
         */
        @Bean
        public CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(RabbitConst.DELAY_EXCHANGE, "x-delayed-message", true, false, args);
        }
    
        /**
         * 延时队列
         *
         * @return
         */
        @Bean
        public Queue delayQueue() {
            return new Queue(RabbitConst.DELAY_QUEUE, true);
        }
    
        /**
         * 给延时队列绑定交换机
         *
         * @return
         */
        @Bean
        public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
            return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConst.DELAY_KEY).noargs();
        }
    }
    
    

    5、RabbitMQ 生产者

    package com.asurplus.common.rabbitmq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * rabbitMq生产者
     *
     * @Author Lizhou
     */
    @Component
    @Slf4j
    public class RabbitProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送消息
         *
         * @param object      发送对象
         * @param millisecond 延时(毫秒)
         */
        public void sendDelayMessage(Object object, long millisecond) {
            this.rabbitTemplate.convertAndSend(
                    RabbitConst.DELAY_EXCHANGE,
                    RabbitConst.DELAY_KEY,
                    object.toString(),
                    message -> {
                        message.getMessageProperties().setHeader("x-delay", millisecond);
                        return message;
                    }
            );
        }
    }
    
    

    6、RabbitMQ 消费者

    package com.asurplus.common.rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * activeMq消费者
     *
     * @Author Lizhou
     */
    @Component
    @Slf4j
    public class RabbitConsumer {
    
        /**
         * 接收消息
         *
         * @param object 监听的内容
         */
        @RabbitListener(queues = RabbitConst.DELAY_QUEUE)
        public void cfgUserReceiveDealy(Object object, Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            try {
                log.info("接受消息:{}", object.toString());
            } catch (Exception e) {
                log.error(e.getMessage());
                /**
                 * basicRecover方法是进行补发操作,
                 * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
                 * 设置为false是只补发给当前的consumer
                 */
                channel.basicRecover(false);
            }
        }
    }
    
    

    五、测试

    package com.asurplus;
    
    import com.asurplus.common.rabbitmq.RabbitProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @RestController
    public class RabbitmqApplication {
    
        @Autowired
        private RabbitProducer product;
    
        @GetMapping("init")
        public void init() {
            String message1 = "这是第一条消息";
            String message2 = "这是第二条消息";
            product.sendDelayMessage(message1, 5000);
            product.sendDelayMessage(message2, 10000);
        }
    
        public static void main(String[] args) {
            SpringApplication.run(RabbitmqApplication.class, args);
        }
    
    }
    
    

    通过测试,第一条消息在 5s后接收到,第二条消息在 10s后接收到,说明我们的延时队列已经成功

    如您在阅读中发现不足,欢迎留言!!!

    展开全文
  • 前言某个产品 或者订单,有个有效期 过了有效期要取消方法...:用rabbitmq延时队列 一开始将其丢入mq 死信队列,设置有效期,过时转发到其他队列,再启动一个消费者 消费 更改表状态php 安装mq扩展搭建mq服务创建生...

    前言

    某个产品 或者订单,有个有效期 过了有效期要取消

    方法一 : 写个脚本,用crontab 定时扫描 改变状态 但是最低只能一分钟 ,不适合

    方法二 : 用swoole得毫秒定时器,每秒钟去扫描表 明显占用资源 mysql受不了

    方法三 :用rabbitmq延时队列 一开始将其丢入mq 死信队列,设置有效期,过时转发到其他队列,再启动一个消费者 消费  更改表状态

    php 安装mq扩展

    搭建mq服务

    创建生产者和消费者

    生产者  publish.php

    header(‘Content-Type:text/html;charset=utf8;‘);

    $time = 30;

    $params = array(

    ‘exchangeName‘ => ‘test_cache_exchange‘."_".$time,

    ‘queueName‘ => ‘test_cache_queue‘."_".$time,

    ‘routeKey‘ => ‘test_cache_route‘."_".$time,

    );

    $connectConfig = array(

    ‘host‘ => ‘127.0.0.1‘,

    ‘port‘ => 5672,

    ‘login‘ => ‘admin‘,

    ‘password‘ => ‘password‘,

    ‘vhost‘ => ‘/‘

    );

    //var_dump(extension_loaded(‘amqp‘));

    //

    //exit();

    try {

    $conn = new AMQPConnection($connectConfig);

    $conn->connect();

    if (!$conn->isConnected()) {

    //die(‘Conexiune esuata‘);

    //TODO 记录日志

    echo ‘rabbit-mq 连接错误:‘, json_encode($connectConfig);

    exit();

    }

    $channel = new AMQPChannel($conn);

    if (!$channel->isConnected()) {

    // die(‘Connection through channel failed‘);

    //TODO 记录日志

    echo ‘rabbit-mq Connection through channel failed:‘, json_encode($connectConfig);

    exit();

    }

    $exchange = new AMQPExchange($channel);

    $exchange->setFlags(AMQP_DURABLE);//持久化

    $exchange->setName($params[‘exchangeName‘]?:‘‘);

    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型

    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);

    $queue->setName($params[‘queueName‘]?:‘‘);

    $queue->setFlags(AMQP_DURABLE);

    // 和普通生产者区别 在这 下面是过期时间和转发到的路由

    $queue->setArguments(array(

    ‘x-dead-letter-exchange‘ => ‘delay_exchange‘,

    ‘x-dead-letter-routing-key‘ => ‘delay_route‘,

    ‘x-message-ttl‘ => $time*1000,

    ));

    $queue->declareQueue();

    //绑定

    $queue->bind($params[‘exchangeName‘], $params[‘routeKey‘]);

    } catch(Exception $e) {

    }

    //$num = mt_rand(100, 500);

    $num = 1;

    //生成消息

    $exchange->publish(date("Y-m-d H:i:s"), $params[‘routeKey‘], AMQP_MANDATORY, array(‘delivery_mode‘=>2));

    消费者 consumer.php

    header(‘Content-Type:text/html;charset=utf8;‘);

    $params = array(

    ‘exchangeName‘ => ‘delay_exchange‘,

    ‘queueName‘ => ‘delay_queue‘,

    ‘routeKey‘ => ‘delay_route‘,

    );

    $connectConfig = array(

    ‘host‘ => ‘localhost‘,

    ‘port‘ => 5672,

    ‘login‘ => ‘admin‘,

    ‘password‘ => ‘password‘,

    ‘vhost‘ => ‘/‘

    );

    //var_dump(extension_loaded(‘amqp‘));

    //exit();

    try {

    $conn = new AMQPConnection($connectConfig);

    $conn->connect();

    if (!$conn->isConnected()) {

    //die(‘Conexiuneesuata‘);

    //TODO记录日志

    echo ‘rabbit-mq连接错误:‘, json_encode($connectConfig);

    exit();

    }

    $channel = new AMQPChannel($conn);

    if (!$channel->isConnected()) {

    //die(‘Connectionthroughchannelfailed‘);

    //TODO记录日志

    echo ‘rabbit-mqConnectionthroughchannelfailed:‘, json_encode($connectConfig);

    exit();

    }

    $exchange = new AMQPExchange($channel);

    $exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端

    $exchange->setName($params[‘exchangeName‘] ?: ‘‘);

    $exchange->setType(AMQP_EX_TYPE_DIRECT);//direct类型

    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);

    $queue->setName($params[‘queueName‘] ?: ‘‘);

    $queue->setFlags(AMQP_DURABLE);

    $queue->declareQueue();

    //绑定

    $queue->bind($params[‘exchangeName‘], $params[‘routeKey‘]);

    } catch (Exception$e) {

    echo $e->getMessage();

    exit();

    }

    function callback(AMQPEnvelope $message){

    global $queue;

    if ($message) {

    $body = $message->getBody();

    echo $body . PHP_EOL;

    $queue->ack($message->getDeliveryTag());

    } else {

    echo ‘nomessage‘ . PHP_EOL;

    }

    }

    //$queue->consume(‘callback‘);第一种消费方式,但是会阻塞,程序一直会卡在此处

    //第二种消费方式,非阻塞

    $start = time();

    while (true) {

    $message = $queue->get();

    if (!empty($message)) {

    echo $message->getBody()."--失效时间 ".date("Y-m-d H:i:s"). PHP_EOL;

    $queue->ack($message->getDeliveryTag());//应答,代表该消息已经消费

    //$end = time();

    //echo ‘
    ‘ . ($end - $start);

    } else {

    //echo‘messagenotfound‘.PHP_EOL;

    }

    }

    执行推送 我改了不同时间推送,会生成不同的交换机 路由 队列,因为我用得是direct类型  要一一匹配

    20200718154048977676.png

    消费者开启

    [root@localhost mq]# php consumer.php

    2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42

    2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42

    2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43

    2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43

    2020-07-18 11:13:04--失效时间 2020-07-18 11:13:24

    2020-07-18 11:21:00--失效时间 2020-07-18 11:21:10

    2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02

    2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02

    2020-07-18 11:21:22--失效时间 2020-07-18 11:22:12

    2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13

    2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13

    发现正常,都是我设置的事件过期后就到处理队列,在这里消费,处理逻辑即可

    参考 https://www.cnblogs.com/Zhangcsc/p/11739754.html

    https://blog.csdn.net/weixin_34310369/article/details/92262465?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-2

    展开全文
  • 什么是延时队列 延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后...

    什么是延时队列

    延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后被消费。普通队列是即时消费的,延时队列是根据延时时间,多久之后才能消费的。

    在这里插入图片描述

    延时队列使用场景

    • 订单在十分钟之内未支付则自动取消。
    • 会员续费的定时推送
    • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
    • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
    • 优惠券过期提醒

    核心的应用内容基本都是基于需要设定过期时间的

    RabbitMQ如何实现延时队列

    • 方式1、通过RabbitMQ的高级特性TTL和配合死信队列
    • 方式2、安装rabbitmq_delayed_message_exchange插件

    RabbitMQ中的高级特性TTL

    TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒,为什么延时队列要介绍它?TTL就是一种消息过期策略。给我们的消息做过期处理,当消息在队列中存活了指定时候之后,改队列就会将这个消息直接丢弃。在RabbitMQ中并没有直接实现好的延时队列,我们可以使用TTL这种高级特性,然后配合死信队列,即可实现延时队列的功能。

    那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:
    方式一:

    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl", 6000);
    channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
    

    使用这种方式,消息被设定TTL,一旦消息过期,就会被队列丢弃

    方式二:

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    builder.expiration("6000");
    AMQP.BasicProperties properties = builder.build();
    channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
    

    使用这种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

    另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

    RabbitMQ到底怎么实现延时队列

    • 步骤一:创建一个正常的队列,指定消息过期时间,并且指定消息过期后需要投递的死信交换器和死信交换队列。
    • 步骤二:创建死信队列和死信交换器

    RabbitMQ实现延时队列实例

    package com.example.demo;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author echo
     * @date 2021-01-14 14:35
     */
    public class TopicDealProductTest {
    
        /**
         * 延时队列交换机
         */
        private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
        /**
         * 死信队列交换机
         */
        private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
        /**
         * 延时队列
         */
        private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
        /**
         * 死信队列
         */
        private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
        /**
         * 延时队列ROUTING_KEY
         */
        private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
        /**
         * 延时队列ROUTING_KEY
         */
        private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
        private static final String IP_ADDRESS = "192.168.230.131";
        private static final int PORT = 5672;
    
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            Connection connection = createConnection();
            // 创建一个频道
            Channel channel = connection.createChannel();
            sendMsg(channel);
            Thread.sleep(10000);
            closeConnection(connection, channel);
        }
    
        private static void sendMsg(Channel channel) throws IOException {
    
            // 创建延时队列和延时交换器
            channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
            Map<String, Object> map = new HashMap<>(16);
            // 在延时交换器上指定死信交换器
            map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
            // 在延时交换器上指定死信队列的routing-key
            map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
            // 设定延时队列的延长时长 10s
            map.put("x-message-ttl", 10000);
            // 创建延时队列
            channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map);
            // 在延时交换器上绑定延时队列
            channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);
    
            // 创建死信队列和死信交换器
            channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null);
            // 创建死信队列
            channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null);
            // 在死信交换器上绑定死信队列
            channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);
    
            channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, null, "hello world".getBytes());
    
        }
    
        private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
            // 关闭资源
            channel.close();
            connection.close();
        }
    
        private static Connection createConnection() throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 设置RabbitMQ的链接参数
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername("echo");
            factory.setPassword("123456");
            // 和RabbitMQ建立一个链接
            return factory.newConnection();
        }
    
    }
    

    到这里,其实我们不难发现,我们无非是利用了TTL这个特性,让消息在过期的时候丢弃到指定队列,死信队列其实也是一个普通队列。

    执行之后,我们来看看结果,在Exchange里面,我们创建了两个交换器和两个队列,但是两个队列和交换器还是有区别的,我们来看图片

    在这里插入图片描述

    我们可以看到两个队列的Features标志是不一样的

    • TTL: 消息在队列中的过期时间
    • DLX: 该队列绑定了死信交换器
    • DLK: 该队列绑定的死信队列的ROUTING_KEY

    在我们执行完成只有,我们可以看到,消息先被投递到了delay,该队列里面的消息,到达过期时间之后就被投递到了dead队列中去了。

    那么我们上面介绍了TTL和设置AMQP.BasicProperties,这两种有一定的区别,前一个是设置队列消息过期时间,后一个是设定每条消息的过期时间。那他们的区别在哪里?

    设置每条消息和设置TTL的区别

    其实这两种方式的区别就在于怎么判断该消息是否要被丢弃。TTL设定的队列,只要消息到达过期时间,立马就会将消息丢弃。如果是后者,可能我们队列里面有很多的消息,然后每条消息的过期时间又不一致,这个时候,如果队列出口处堵了很多没有设定过期时间的消息又不被消费的时候,队列后面的消息及时设定了过期时间也不会被丢弃,只有在设定了过期时间的消息到了队列该消费的位置,才会判定

    怎么使用AMQP.BasicProperties?

    package com.example.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author echo
     * @date 2021-01-14 14:35
     */
    public class TopicDealProductTest {
    
        /**
         * 延时队列交换机
         */
        private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay";
        /**
         * 死信队列交换机
         */
        private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead";
        /**
         * 延时队列
         */
        private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay";
        /**
         * 死信队列
         */
        private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead";
        /**
         * 延时队列ROUTING_KEY
         */
        private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey";
        /**
         * 延时队列ROUTING_KEY
         */
        private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey";
        private static final String IP_ADDRESS = "192.168.230.131";
        private static final int PORT = 5672;
    
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            Connection connection = createConnection();
            // 创建一个频道
            Channel channel = connection.createChannel();
            sendMsg(channel);
            Thread.sleep(10000);
            closeConnection(connection, channel);
        }
    
        private static void sendMsg(Channel channel) throws IOException {
    
            // 创建延时队列和延时交换器
            channel.exchangeDeclare(DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT);
            Map<String, Object> map = new HashMap<>(16);
            // 在延时交换器上指定死信交换器
            map.put("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD);
            map.put("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY);
            // 设定延时队列的延长时长 10s
    //        map.put("x-message-ttl", 10000);
            // 创建延时队列
            channel.queueDeclare(DIRECT_QUEUE_DELAY, true, false, false, map);
            // 在延时交换器上绑定延时队列
            channel.queueBind(DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY);
    
            // 创建死信队列和死信交换器
            channel.exchangeDeclare(DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null);
            // 创建死信队列
            channel.queueDeclare(DIRECT_QUEUE_DEAD, true, false, false, null);
            // 在死信交换器上绑定死信队列
            channel.queueBind(DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY);
    
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            builder.expiration("10000");
            AMQP.BasicProperties properties = builder.build();
            channel.basicPublish(DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, false, properties,  "hello world".getBytes());
    
        }
    
        private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
            // 关闭资源
            channel.close();
            connection.close();
        }
    
        private static Connection createConnection() throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 设置RabbitMQ的链接参数
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername("echo");
            factory.setPassword("123456");
            // 和RabbitMQ建立一个链接
            return factory.newConnection();
        }
    
    }
    

    我们运行完成成之后,可以看到和我们之前那一种方式的效果是一样的

    在这里插入图片描述

    两种设定过期时间的方式其实区别就在于一个统一设定了过期时间,一个指定每条过期时间。但是并不影响我们延时队列的实现,那我们怎么选择呢?

    怎么选择TTL设定方式?

    根据两种方式的特性来选定使用场景才是最合理的。我们如果用来做延时队列的,想将延时队列的特性应用到实际场景的,并且对时时性要求比较高的,建议选择第一种方式。

    总结

    延时队列的实现并不难,关键是我们要知道他的一个原理,了解RabbitMQ他的TTL和死信对了。掌握了它的这些特性之后,我们就可以很好的应用延时队列。延时队列在工作中对我们的帮组也非常大,不过RabbiTMQ没有原生延时队列,我们用这种方式实现了它并不意味着我们一定要选择它。其实还有很多的方式,比如Java中的DelayQueu、kafka的时间轮等。

    展开全文
  • 参考书籍:《redis深度历险:核心原理与应用实践》这节延时队列就是乘着上节的锁冲突处理的延时队列来的。但是在此之前我要讲一下我们如何编辑python文件,因为总是写在命令行不能保存代码,也不能修改。我们可以先...
  • Redis实现延时队列有两种实现方式: key失效监听回调 zset分数存时间戳 三、方案选择 key失效监听存在两个问题: Redis的pubsub不会被持久化,服务器宕机就会被丢弃 没有高级特性,没有ack机制,可靠性不高 zset...
  • redis延时队列.zip

    2021-01-22 11:30:59
    redis延时队列
  • 任务延时队列

    2020-03-17 16:45:14
    文章目录1 任务延时队列1.1 现有问题1.2 延时队列设计1.3 设计实现 1 任务延时队列 1.1 现有问题 现有问题: 每次时钟节拍中断都需要扫描所有任务,比较耗时。 不易支持多个任务具有相同优先级。 我们需要更加快速...
  • 1. 什么是延时队列? 2. 如何实现一个高效的延时队列? 3. DelayQueue的实现原理 4. RabbitMQ实现延时队列的基本原理 5. Redis实现延时队列的基本原理 6. 时间轮(Time Wheel) 7. 几种方案的对比

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,587
精华内容 1,434
关键字:

延时队列