精华内容
下载资源
问答
  • DelayQueue延迟队列

    2021-02-23 17:18:14
    * 延迟队列 时间越长最迟从队列中输出 */ public class DelayQueueController implements Delayed { //名称 private String name; //时长 private long time; public DelayQueueController(String name, long...

    特征:
    DelayQueue延迟队列,基于优先级队列来实现
    存储元素必须实现Delayed接口(Delayed接口继承了Comparable接口

    分析:
    由于是基于优先级队列实现,但是它比较的是时间,我们可以根据需要去倒叙或者正序排列(一般都是倒叙,用于倒计时)

    使用场景:
    订单超时取消功能
    用户下订单未支付开始倒计时,超时则释放订单中的资源,如果取消或者完成支付,我们再讲队列中的数据移除掉。

    /**
     * 延迟队列  时间越长最迟从队列中输出
     */
    public class DelayQueueController implements Delayed {
        //名称
        private String name;
        //时长
        private long time;
        public DelayQueueController(String name, long time, TimeUnit unit) {
            this.name = name;
            this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
        }
        //时间
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }
        @Override
        public int compareTo(Delayed o) {
            DelayQueueController work = (DelayQueueController) o;
            long diff = this.time - work.time;
            if (diff <= 0) {// 改成>=会造成问题
                return -1;
            } else {
                return 1;
            }
        }
    
        public static void main(String[] args) {
            BlockingQueue<DelayQueueController> queue3 = new DelayQueue<>();
            try {
                DelayQueueController work = new DelayQueueController("用户一", 25, TimeUnit.SECONDS);
                DelayQueueController work2 = new DelayQueueController("用户二", 5, TimeUnit.SECONDS);
                DelayQueueController work3 = new DelayQueueController("用户三", 15, TimeUnit.SECONDS);
                queue3.add(work);
                queue3.add(work2);
                queue3.add(work3);
                for (; ; ) {
                    DelayQueueController work1 = queue3.take();
                    System.out.println(work1.name + "," + work1.time);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    输出

        用户二,1614071429602
        用户三,1614071439602
        用户一,1614071449602
    

    其他队列参考

    展开全文
  • DelayQueue延迟队列的一个例子,就是网吧中,按上网人交钱的多少给多少上网时间,给钱越多的越迟出队列。 网吧类: import java.util.concurrent.DelayQueue; public class WangBa implements Runnable { ...

    DelayQueue延迟队列的一个例子,就是网吧中,按上网人交钱的多少给多少上网时间,给钱越多的越迟出队列。

    网吧类:

    import java.util.concurrent.DelayQueue;
    
    public class WangBa implements Runnable {  
        
        private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();  
        
        public boolean yinye =true;  
          
        public void shangji(String name,String id,int money){  
            Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());  
            System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");  
            this.queue.add(man);  
        }  
          
        public void xiaji(Wangmin man){  
            System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");  
        }  
      
        @Override  
        public void run() {  
            while(yinye){  
                try {  
                    Wangmin man = queue.take();  
                    xiaji(man);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
          
        public static void main(String args[]){  
            try{  
                System.out.println("网吧开始营业");  
                WangBa siyu = new WangBa();  
                Thread shangwang = new Thread(siyu);  
                shangwang.start();  
                  
                siyu.shangji("路人甲", "123", 1);  
                siyu.shangji("路人乙", "234", 10);  
                siyu.shangji("路人丙", "345", 5);  
            }  
            catch(Exception e){  
                e.printStackTrace();
            }  
      
        }  
    }  

     

    网民类:
     

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Wangmin implements Delayed {  
        
        private String name;  
        //身份证  
        private String id;  
        //截止时间  
        private long endTime;  
        //定义时间工具类
        private TimeUnit timeUnit = TimeUnit.SECONDS;
          
        public Wangmin(String name,String id,long endTime){  
            this.name=name;  
            this.id=id;  
            this.endTime = endTime;  
        }  
          
        public String getName(){  
            return this.name;  
        }  
          
        public String getId(){  
            return this.id;  
        }  
          
        /** 
         * 用来判断是否到了截止时间 
         */  
        @Override  
        public long getDelay(TimeUnit unit) { 
            //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        	return endTime - System.currentTimeMillis();
        }  
      
        /** 
         * 相互批较排序用 
         */  
        @Override  
        public int compareTo(Delayed delayed) {  
        	Wangmin w = (Wangmin)delayed;  
            return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;  
        }  
      
    
        
    }  

     

    展开全文
  • 基于Redis实现DelayQueue延迟队列设计方案

    万次阅读 多人点赞 2019-08-08 15:50:29
    应用场景 创建订单10分钟之后自动支付 叫预约单专车出行前30分钟发送短信提示 订单超时取消 ...使用RabbitMq 实现 RabbitMq实现延迟队列 优点: 开源,现成的稳定的实现方案; 缺点: RabbitMq是一个...

    应用场景


    • 创建订单10分钟之后自动支付
    • 订单超时取消
    • …等等…

    实现方式


    • 最简单的方式,定时扫表;例如每分钟扫表一次十分钟之后未支付的订单进行主动支付 ;
      优点: 简单
      缺点: 每分钟全局扫表,浪费资源,有一分钟延迟

    • 使用RabbitMq 实现 RabbitMq实现延迟队列
      优点: 开源,现成的稳定的实现方案;
      缺点: RabbitMq是一个消息中间件;延迟队列只是其中一个小功能,如果团队技术栈中本来就是使用RabbitMq那还好,如果不是,那为了使用延迟队列而去部署一套RabbitMq成本有点大;

    • 使用Java中的延迟队列,DelayQueue
      优点: java.util.concurrent包下一个延迟队列,简单易用;拿来即用
      缺点: 单机、不能持久化、宕机任务丢失等等;

    基于Redis自研延迟队列


    既然上面没有很好的解决方案,因为Redis的zset、list的特性,我们可以利用Redis来实现一个延迟队列 RedisDelayQueue

    设计目标

    • 实时性: 允许存在一定时间内的秒级误差
    • 高可用性:支持单机,支持集群
    • 支持消息删除:业务费随时删除指定消息
    • 消息可靠性: 保证至少被消费一次
    • 消息持久化: 基于Redis自身的持久化特性,上面的消息可靠性基于Redis的持久化,所以如果redis数据丢失,意味着延迟消息的丢失,不过可以做主备和集群保证;

    数据结构

    • Redis_Delay_Table: 是一个Hash_Table结构;里面存储了所有的延迟队列的信息;KV结构;K=TOPIC:ID V=CONENT; V由客户端传入的数据,消费的时候回传;
    • RD_ZSET_BUCKET: 延迟队列的有序集合; 存放member=TOPIC:ID 和score=执行时间戳; 根据时间戳排序;
    • RD_LIST_TOPIC: list结构; 每个Topic一个list;list存放的都是当前需要被消费的延迟Job;

    设计图
    设计图

    任务的生命周期

    1. 新增一个Job,会在Redis_Delay_Table中插入一条数据,记录了业务消费方的 数据结构; RD_ZSET_BUCKET 也会插入一条数据,记录了执行时间戳;
    2. 搬运线程会去RD_ZSET_BUCKET中查找哪些执行时间戳runTimeMillis比现在的时间小;将这些记录全部删除;同时会解析出来每个任务的Topic是什么,然后将这些任务pushTopic对应的列表RD_LIST_TOPIC中;
    3. 每个Topic的List都会有一个监听线程去批量获取List中的待消费数据;获取到的数据全部扔给这个Topic的消费线程池
    4. 消息线程池执行会去Redis_Delay_Table查找数据结构,返回给回调接口,执行回调方法;

    以上所有操作,都是基于Lua脚本做的操作,Lua脚本执行的优点在于,批量命令执行具有原子性,事务性, 并且降低了网络开销,毕竟只有一次网络开销;


    搬运线程操作流程图

    在这里插入图片描述

    设计细节


    搬运操作

    1.搬运操作的时机

    为了避免频繁的执行搬运操作, 我们基于 wait(time)/notify 的方式来通知执行搬运操作;
    在这里插入图片描述
    我们用一个AtomicLong nextTime 来保存下一次将要搬运的时间;服务启动的时候nextTime=0;所以肯定比当前时间小,那么就会先去执行一次搬运操作,然后返回搬运操作之后的ZSET的表头时间戳,这个时间戳就是下一次将要执行的时间戳, 把这个时间戳赋值给 nextTime; 如果表中没有元素了则将nextTime=Long.MaxValue ;因为while循环,下一次又会跟当前时间对比;如果nextTime比当前时间大,则说明需要等待; 那么我们wait(nextTime-System.currentTimeMills()); 等到时间到了之后,再次去判断一下,就会比当前时间小,就会执行一次搬运操作;

    那么当有新增延迟任务Job的时间怎么办,这个时候又会将当前新增Job的执行时间戳跟nextTime做个对比;如果小的话就重新赋值;
    重新赋值之后,还是调用一下 notifyAll() 通知一下搬运线程;让他重新去判断一下 新的时间是否比当前时间小;如果还是大的话,那么就继续wait(nextTime-System.currentTimeMills()); 但是这个时候wait的时间又会变小;更精准;

    2.一次搬运操作的最大数量
    redis的执行速度非常快,在一个Lua里面循环遍历1000个10000个根本没差; 而且是在Lua里面操作,就只有一次网络开销;一次操作多少个元素根本就不会是问题;


    搬运操作的防护机制

    1.每分钟唤醒定时线程

    在消费方多实例部署的情况下, 如果某一台机器挂掉了,但是这台机器的nextTime是最小的,就在一分钟之后( 新增job的时候落到这台机器,刚好时间戳很小), 其他机器可能是1个小时之后执行搬运操作; 如果这台机器立马重启,那么还会立马执行一次搬运操作;万一他没有重启;那可能就会很久之后才会搬运;
    所以我们需要一种防护手段来应对这种极端情况;
    比如每分钟将nextTime=0;并且唤醒wait;
    那么就会至少每分钟会执行一次搬运操作! 这是可以接受的


    LrangeAndLTrim 批量获取且删除待消费任务

    1.执行时机以及如何防止频繁请求redis
    这是一个守护线程,循环去做这样的操作,把拿到的数据给线程池去消费;
    但是也不能一直不停的去执行操作,如果list已经没有数据了去操作也没有任何意义,不然就太浪费资源了,幸好List中有一个BLPOP阻塞原语,如果list中有数据就会立马返回,如果没有数据就会一直阻塞在那里,直到有数据返回,可以设置阻塞的超时时间,超时会返回NULL;
    第一次去获取N个待消费的任务扔进到消费线程池中;如果获取到了0个,那么我们就立马用BLPOP来阻塞,等有元素的时候 BLPOP就返回数据了,下次就可以尝试去LrangeAndLTrim一次了. 通过BLPOP阻塞,我们避免了频繁的去请求redis,并且更重要的是提高了实时性;

    2.批量获取的数量和消费线程池的阻塞队列

    执行上面的一次获取N个元素是不定的,这个要看线程池的maxPoolSize 最大线程数量; 因为避免消费的任务过多而放入线程池的阻塞队列, 放入阻塞队列有宕机丢失任务的风险,关机重启的时候还要讲阻塞队列中的任务重新放入List中增加了复杂性;

    所以我们每次LrangeAndLTrim获取的元素不能大于当前线程池可用的线程数; 这样的一个控制可用用信号量Semaphore来做


    Codis集群对BLPOP的影响

    如果redis集群用了codis方案或者Twemproxy方案; 他们不支持BLPOP的命令;
    codis不支持的命令集合
    那么就不能利用BLPOP来防止频繁请求redis;那么退而求其次改成每秒执行一次LrangeAndLTrim操作;


    集群对Lua的影响

    Lua脚本的执行只能在单机器上, 集群的环境下如果想要执行Lua脚本不出错,那么Lua脚本中的所有key必须落在同一台机器;
    为了支持集群操作Lua,我们利用hashtag; 用{}把三个jey的关键词包起来;
    {projectName}:Redis_Delay_Table
    {projectName}:Redis_Delay_Table
    {projectName}:RD_LIST_TOPIC
    那么所有的数据就会在同一台机器上了


    重试机制

    消费者回调接口如果抛出异常了,或者执行超时了,那么会将这个Job重新放入到RD_LIST_TOPIC中等待被下一次消费;默认重试2次;可以设置不重试;

    超时机制

    超时机制的主要思路都一样,就是监听一个线程的执行时间超过设定值之后抛出异常打断方法的执行;

    这是使用的方式是 利用Callable接口实现异步超时处理

    public class TimeoutUtil {
    
        /**执行用户回调接口的 线程池;    计算回调接口的超时时间           **/
        private static ExecutorService executorService = Executors.newCachedThreadPool();
    
        /**
         * 有超时时间的方法
         * @param timeout 时间秒
         * @return
         */
        public static void timeoutMethod(long timeout, Function function) throws InterruptedException, ExecutionException, TimeoutException {
            FutureTask futureTask = new FutureTask(()->(function.apply("")));
            executorService.execute(futureTask);
            //new Thread(futureTask).start();
            try {
                futureTask.get(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                //e.printStackTrace();
                futureTask.cancel(true);
                throw e;
            }
    
        }
    }
    
    

    这种方式有一点不好就是太费线程了,相当于线程使用翻了一倍;但是相比其他的方式,这种算是更好一点的

    优雅停机

    在Jvm那里注册一个 Runtime.getRuntime().addShutdownHook(Runnable)停机回调接口;在这里面做好善后工作;

    • 关闭异步AddJob线程池
    • 关闭每分钟唤醒线程
    • 关闭搬运线程 while(!stop)的形式
    • 关闭所有的topic监听线程 while(!stop)的形式
    • 关闭关闭所有topic的消费线程 ;先调用shutdown;再executor.awaitTermination(20, TimeUnit.SECONDS);检查是否还有剩余的线程任务没有执行完; 如果还没有执行完则等待执行完;最多等待20秒之后强制调用shutdownNow强制关闭;
    • 关闭重试线程 while(!stop)的形式
    • 关闭 异常未消费Job重入List线程池

    优雅停止线程一般是用下面的方式
    ①、 while(!stop)的形式 用标识位来停止线程
    ②.先 调用executor.shutdown(); 阻止接受新的任务;然后等待当前正在执行的任务执行完; 如果有阻塞则需要调用executor.shutdownNow()强制结束;所以要给一个等待时间;

      /**
         * shutdownNow 终止线程的方法是通过调用Thread.interrupt()方法来实现的
         * 如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。
         * 上面的情况中断之后还是可以再执行finally里面的方法的;
         * 但是如果是其他的情况 finally是不会被执行的
         * @param executor
         */
        public static void closeExecutor(ExecutorService executor, String executorName) {
            try {
                //新的任务不进队列
                executor.shutdown();
                //给10秒钟没有停止完强行停止;
                if(!executor.awaitTermination(20, TimeUnit.SECONDS)) {
                    logger.warn("线程池: {},{}没有在20秒内关闭,则进行强制关闭",executorName,executor);
                    List<Runnable> droppedTasks = executor.shutdownNow();
                    logger.warn("线程池: {},{} 被强行关闭,阻塞队列中将有{}个将不会被执行.", executorName,executor,droppedTasks.size() );
                }
                logger.info("线程池:{},{} 已经关闭...",executorName,executor);
            }  catch (InterruptedException e) {
                logger.info("线程池:{},{} 打断...",executorName,executor);
            }
        }
    

    BLPOP阻塞的情况如何优雅停止监听redis的线程

    如果不是在codis集群的环境下,BLPOP是可以很方便的阻塞线程的;但是停机的时候可能会有点问题;

    假如正在关机,当前线程正在BLPOP阻塞, 那关机线程等我们20秒执行, 刚好在倒数1秒的时候BLPOP获取到了数据,丢给消费线程去消费;如果消费线程1秒执行不完,那么20秒倒计时到了,强制关机,那么这个任务就会被丢失了; 怎么解决这个问题呢?

    ①. 不用BLPOP, 每次都sleep一秒去调用LrangeAndLTrim操作;
    ②.关机的时候杀掉 redis的blpop客户端; 杀掉之后 BLPOP会立马返回null; 进入下一个循环体;


    不足

    • 因为Redis的持久化特性,做不到消息完全不丢失,如果要保证完成不丢失,Redis的持久化刷盘策略要收紧
    • 因为Codis不能使用BLPOP这种阻塞的形式,在获取消费任务的时候用了每秒一次去获取,有点浪费性能;
    • 支持消费者多实例部署,但是可能存在不能均匀的分配到每台机器上去消费;
    • 虽然支持redis集群,但是其实是伪集群,因为Lua脚本的原因,让他们都只能落在一台机器上;

    总结

    1. 实时性
      正常情况下 消费的时间误差不超过1秒钟; 极端情况下,一台实例宕机,另外的实例nextTime很迟; 那么最大误差是1分钟; 真正的误差来自于业务方的接口的消费速度

    2. QPS
      完全视业务方的消费速度而定; 延迟队列不是瓶颈

    具体实现细节可以查看公众号;公众号中有实现细节的文章

    进击的老码农
    展开全文
  • 首先这是一个操作频繁的自动化定时功能,对比于定时器有着更大的使用空间和性能优化,无论是前端的setTimeout与setInterval 定时器还是...下面我使用的是DelayQueue延迟队列和Redis的缓存来实现: 1:  (1) 这里...

    首先这是一个操作频繁的自动化定时功能,对比于定时器有着更大的使用空间和性能优化,无论是前端的setTimeout与setInterval 定时器还是后端的TimerTask定时器,在面对短期内的频繁操作都会有着性能和多线程之间的问题,所以这时的队列就起到很重要的作用了,尤其是在于一些消息的推送。下面我使用的是DelayQueue延迟队列和Redis的缓存来实现:

    1:

          (1) 这里我使用的是maven来管理库,所以第一步我们是导入所要实现功能的jar包,DelayQueue包由于jdk的Util包来提供以及我们所需要的Redis的pom依赖,这里我使用的是StringRedisTemplate来操作redis。

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>

          (2)所有功能都是从底层开始,这里底层已经由队列和缓存封装好了,我们只需要编写业务层和控制层(Rest),需要三个业务层来进行处理所有的逻辑,首先是队列和缓存的业务层,这两个主要实现的是对订单存储和获取以及删除。

            DelayServer:队列的业务层,主要用来获取添加到队列中的元素以及对队列的增删。在其中主要有三个变量,启动队列的state,内部接口listener以及队列集合delatQueue。

    @Slf4j
    @Service
    @Getter
    @Setter
    public class DelayService {
        private boolean start ;
        private OnDelayedListener listener;
        private DelayQueue<DshOrder> delayQueue = new DelayQueue<DshOrder>();
    
        public static interface OnDelayedListener{
            public void onDelayedArrived(DshOrder order);
        }
    
        public void start(OnDelayedListener listener){
            if(start){
                return;
            }
            log.error("DelayService 启动");
            start = true;
            this.listener = listener;
            new Thread(new Runnable(){
                public void run(){
                    try{
                        while(true){
                            DshOrder order = delayQueue.take();
                            if(DelayService.this.listener != null){
                                DelayService.this.listener.onDelayedArrived(order);
                            }
                        }
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
        public void add(DshOrder order){
            delayQueue.put(order);
        }
    
        public void remove(String orderId){
            DshOrder[] array = delayQueue.toArray(new DshOrder[]{});
            if(array == null || array.length <= 0){
                return;
            }
            DshOrder target = null;
            for(DshOrder order : array){
                if(order.getOrderId() == orderId){
                    target = order;
                    break;
                }
            }
            if(target != null){
                delayQueue.remove(target);
            }
        }
    
    }

      

    DshOrder:订单队列中的对象,这里用来存储订单的主键以及根据你业务逻辑需要多久取消时间的规格来,我这里使用的是秒。
    
    @Setter
    @Getter
    @ApiModel(description = "订单队列对象")
    public class DshOrder implements Delayed {
        @ApiModelProperty(value = "订单id")
        private String orderId;
        @ApiModelProperty(value = "超时时间")
        private long startTime;
        /**
         * orderId:订单id
         * timeout:自动取消订单的超时时间,秒
         * */
        public DshOrder(String orderId, int timeout){
            this.orderId = orderId;
            this.startTime = System.currentTimeMillis() + timeout*1000L;
        }
        @Override
        public int compareTo(Delayed other) {
            if (other == this){
                return 0;
            }
            if(other instanceof DshOrder){
                DshOrder otherRequest = (DshOrder)other;
                long otherStartTime = otherRequest.getStartTime();
                return (int)(this.startTime - otherStartTime);
            }
            return 0;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
    }

     

    OrderRedisService:Redis业务层,主要用来将订单存入缓存和便利缓存对象到队列中。
    public interface OrderRedisService {
        /**
         * 订单对象加入缓存
         *
         * @param orderId     订单id
         * @param orderObject 订单对象
         */
        void saveOrder(String orderId, OrderObject orderObject);
    
        /**
         * 获得缓存订单对象
         *
         * @param orderId 订单id
         * @return
         */
        String getOrder(String orderId);
    
        /**
         * 删除缓存订单对象
         *
         * @param orderId 订单id
         */
        void deleteOrder(String orderId);
    
        /**
         * 查询所有需要缓存的订单对象
         *
         * @return
         */
        Set<String> sacn();
    
        /**
         * 获得redis键的剩余时间
         *
         * @param key redis键
         * @return 剩余时间
         */
        Long getSurplusTime(String key);
    
    }

         OrderRedisServiceImpl:Redis业务层实现类,提供订单在缓存中的所有实现。首先保存订单时需要将订单存入缓存中并设置过期时间(这是判断订单是否超时的关键),然后通过id来获取订单在缓存中是否存在,以及删除订单在缓存中存在的主键,其次是一个遍历缓存中所有关于订单的主键方法,这是一个match匹配,前提是在存入订单主键时给定特定的格式才能匹配到所有存在的主键(切记match内的匹配字母,这是你订单主键添加入缓存的开头)。最后,由于缓存存在脏数据或者服务挂了的情况使的当订单主键在Redis中过期但是并没有删除(Redis的主键过期删除存在自动删除和手动删除,所以容易产生脏数据),Redis中提供了TTL命令中的获取主键剩余过期时间的方法,也就是getExpire。

    @Slf4j
    @Service
    public class OrderRedisServiceImpl implements OrderRedisService {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        /**
         * 保存订单并设置过期时间
         * @param outTradeId
         * @param redisDo
         */
        @Override
        public void saveOrder(String outTradeId, OrderObject redisDo) {
            String key = outTradeId;
            //key过期时间为30分钟
            redisTemplate.opsForValue().set(key, JsonUtils.obj2Json(redisDo), 600, TimeUnit.SECONDS);
        }
    
        /**
         * 获取订单
         * @param outTradeNo
         * @return
         */
        @Override
        public String getOrder(String outTradeNo) {
            String key = outTradeNo;
            String message = redisTemplate.opsForValue().get(key);
            if(message != null){
                return key;
            }
            return "";
        }
    
        /**
         * 删除订单
         * @param outTradeNo
         */
        @Override
        public void deleteOrder(String outTradeNo) {
            String key = outTradeNo;
            redisTemplate.delete(key);
        }
    
        /**
         * 获取订单中所有的key
         * @return
         */
        @Override
        public Set<String> sacn(){
            Set<String> execute = redisTemplate.execute(new RedisCallback<Set<String>>() {
                @Override
                public Set<String> doInRedis(RedisConnection connection) throws DataAccessException {
                    Set<String> binaryKeys = new HashSet<>();
                    Cursor<byte[]> cursor = connection.scan( new ScanOptions.ScanOptionsBuilder().match("order*").count(100).build());
                    while (cursor.hasNext()) {
                        binaryKeys.add(new String(cursor.next()));
                    }
                    return binaryKeys;
                }
            });
            return execute;
        }
    
        @Override
        public Long getSurplusTime(String key) {
            return redisTemplate.getExpire(key,TimeUnit.SECONDS);
        }
    }

     (3) 队列和缓存的业务层已经写好了,但是考虑到系统队列可能会在系统奔溃后删除了本地的数据,使得服务重启后数据消失,下面我用了监听来在系统启动时,将Redis中的订单加入到队列中,但这些的前提都是在Redis的数据没有被一并清除。

    @Slf4j
    @Service
    public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {
    
        @Autowired
        DelayService delayService;
        @Autowired
        OrderRedisService redisService;
        @Autowired
        StringRedisTemplate stringRedisTemplate;
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent evt) {
            log.error(">>>>>>>>>>>>系统启动完成,onApplicationEvent()");
            if (evt.getApplicationContext().getParent() == null) {
                return;
            }
            //自动取消订单
            delayService.start(new DelayService.OnDelayedListener(){
                @Override
                public void onDelayedArrived(final DshOrder order) {
                    //异步来做
                    ThreadPoolUtils.execute(new Runnable(){
                        public void run(){
                            String orderId = order.getOrderId();
                            //查库判断是否需要自动取消订单
                            int surpsTime = redisService.getSurplusTime(orderId).intValue();
                            log.error("redis键:" + orderId + ";剩余过期时间:"+surpsTime);
                            if(surpsTime > 0){
                                log.error("没有需要取消的订单!");
                            }else{
                                log.error("自动取消订单,删除队列:"+orderId);
                                //从队列中删除
                                delayService.remove(orderId);
                                //从redis删除
                                redisService.deleteOrder(orderId);
                                log.error("自动取消订单,删除redis:"+orderId);
                                //todo 对订单进行取消订单操作
                            }
                        }
                    });
                }
            });
            //查找需要入队的订单
            ThreadPoolUtils.execute(new Runnable(){
                public void run() {
                    log.error("查找需要入队的订单");
                    Set<String> keys = redisService.sacn();
                    if(keys == null || keys.size() <= 0){
                        return;
                    }
                    log.error("需要入队的订单keys:"+keys);
                    log.error("写到DelayQueue");
                    for(String key : keys){
                        String orderKey = redisService.getOrder(key);
                        int surpsTime = redisService.getSurplusTime(key).intValue();
                        log.error("读redis,key:"+key);
                        log.error("redis键:" + key + ";剩余过期时间:"+surpsTime);
                        if(orderKey != null){
                            DshOrder dshOrder = new DshOrder(orderKey,surpsTime);
                            delayService.add(dshOrder);
                            log.error("订单自动入队:"+dshOrder);
                        }
                    }
                }
            });
        }
    }

     (4) 接下来便是对订单的业务层进行最后的操作,也就是在你生成订单的时候给订单加入到队列和缓存中去并设置过期时间,这里我使用的线程池来进行操作。

    public class ThreadPoolUtils {
    
        private final ExecutorService executor;
    
        private static ThreadPoolUtils instance = new ThreadPoolUtils();
    
        private ThreadPoolUtils() {
            this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
        }
    
        public static ThreadPoolUtils getInstance() {
            return instance;
        }
    
        public static <T> Future<T> execute(final Callable<T> runnable) {
            return getInstance().executor.submit(runnable);
        }
    
        public static Future<?> execute(final Runnable runnable) {
            return getInstance().executor.submit(runnable);
        }
    
    
    }

    最后便是将订单加入到缓存和队列。

    //把订单插入到待取消的队列和redis
            ThreadPoolUtils.execute(new Runnable() {
                @Override
                public void run() {
                    String itrOrderId = "order" + orderId;
                    //1 插入到待收货队列
                    DshOrder dshOrder = new DshOrder(itrOrderId, 600);
                    delayService.add(dshOrder);
                    log.error("订单order" + orderId + "入队列");
                    //2插入到redis
                    orderRedisService.saveOrder(itrOrderId, orderObject);
                    log.error("订单order" + orderId + "入redis缓存");
                }
            });

    还有一点便是在你设置完后,但不需要自动去实现,要去清理缓存和队列中的数据

    String delOrderId = "order" + orderId;
            int surpsTime = orderRedisService.getSurplusTime(delOrderId).intValue();
            log.error("redis键:" + delOrderId + ";剩余过期时间:"+surpsTime);
            if (surpsTime <= 0) {
                delayService.remove(delOrderId);
                log.error("订单手动出队:" + delOrderId);
                orderRedisService.deleteOrder(delOrderId);
                log.error("订单手动出redis:" + delOrderId);
            }

     

     

    总结:其实在消息这方面有着很多中间件,例如rabbitMQ,activitiMQ,kafka

    RabbitMQ,遵循AMQP协议,由内在高并发的erlanng语言开发,用在实时的对可靠性要求比较高的消息传递上。

    kafka是Linkedin于2010年12月份开源的消息发布订阅系统,它主要用于处理活跃的流式数据,大数据量的数据处理上,但大多数用于的是日志。

    这让我想到springcloud和dubbo,前者是spring的产物,后者是阿里巴巴的产物。都在微服务中起到决定性的作用。

    所以在对实现功能时的选择很重要,如果你的系统所处理的数据量不是很大,我觉得队列和缓存很适合你,这样你可以对消息的传递更加了解,但你使用MQ,kafka的中间件时,你会发现使用起来更加轻松,但对于数据量大的系统来说,中间件是最好的选择,在这个大数据的时代,高并发,多线程,分布式会越来越重要,也是你技术上升的一个很大转折点。

    展开全文
  • DelayQueue延迟队列理解: 1、DelayQueue队列中的元素必须是Delayed接口的实现类,该类内部实现了getDelay()和compareTo()方法,第一个方法是比较两个任务的延迟时间进行排序,第二个方法用来获取延迟时间。 2、...
  • delayqueue 延迟队列 java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的...
  • 而DelayedWorkQueue就是一种延迟队列,今天学习是并发包提供的延迟队列DelayQueue)。 ​ 延迟队列说明 延迟队列提供的功能是在指定时间点才能获取队列元素的功能,队列最前面的元素是最优先执行的元素。 列举一下...
  • 1. 延迟队列 DelayQueue 它是包含Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll ...
  • 而DelayedWorkQueue就是一种延迟队列,今天学习是并发包提供的延迟队列(DelayQueue)。​延迟队列说明延迟队列提供的功能是在指定时间点才能获取队列元素的功能,队列最前面的元素是最优先执行的元素。列举一下使用...
  • 所谓的延迟队列最大的特征是它可以自动通过队列进行脱离,例如:现在有一些... DelayQueue延迟队列主要的使用类,所谓的延迟队列=BlockingQueue + PriorityQueue + Delayed。【 延迟队列的基本使用 】 下面编写...
  • 延迟元素的无限制BlockingQueu,其中元素只能在其延迟到期后才能获取。当元素的getDelay(TimeUnit.NANOSECONDS)方法返回小于或等于零的值时,就会发生过期。即使未到期的元素无法使用take或poll删除,它们也被视为...
  • 而DelayedWorkQueue就是一种延迟队列,今天学习是并发包提供的延迟队列(DelayQueue)。延迟队列说明延迟队列提供的功能是在指定时间点才能获取队列元素的功能,队列最前面的元素是最优先执行的元素。列举一下使用场景...
  • 而DelayedWorkQueue就是一种延迟队列,今天学习是并发包提供的延迟队列DelayQueue)。延迟队列说明延迟队列提供的功能是在指定时间点才能获取队列元素的功能,队列最前面的元素是最优先执行的元素。列举一下使用...
  • package ... import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; import java.util.concurrent.D...
  • 并发编程juc包学习5 延时队列学习 延时队列,首先,它是一种队列,...其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中...
  • 对外提供加入消息到延迟队列方法 addToOrderDelayQueue ,当下单的时候加入延迟消息队列。 对外提供删除延迟消息方法 removeToOrderDelayQueue ,当用户主动取消订单,或者支付成功后使用。 public class ...
  • DelayQueue延迟阻塞队列

    2018-04-06 23:21:55
     带有延迟时间的Queue,其中的元素只有当其指定了延迟时间到了,才能够从队列中获取元素。DelayQueue中的元素必须实现Delay接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除...
  • 实现方式有多种,包括使用rabbitMQ死信队列处理,jdk的DelayQueue延迟队列,redission的延迟队列。 本次介绍DelayQueue队列的使用及其实现原理。 过程 使用DelayQueue延迟队列,需要加入队列对象实现Delayed接口,...
  • DelayQueue实现延迟队列

    2020-09-17 15:20:43
    public class Q { public static void main(String[] args) throws Exception { DelayQueue<Order> orders = new DelayQueue<>(); Order order1 = new Order(1000, "1x"); O.

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 471
精华内容 188
关键字:

delayqueue延迟队列