精华内容
下载资源
问答
  • DelayQueue

    2020-11-07 23:53:11
    缓存系统的设计:使用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,就表示有缓存到期了。 定时任务调度:使用DelayQueue保存当天要执行的任

    概述

    DelayQueue是一个支持时延获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
    DelayQueue可以运用在以下两个应用场景:

    1. 缓存系统的设计:使用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,就表示有缓存到期了。
    2. 定时任务调度:使用DelayQueue保存当天要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如Tiner就是使用DelayQueue实现的。

    用法实例

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author admin
     */
    public class Message implements Delayed {
        /**
         *触发时间
         */
        private long time;
    
        /**
         *名称
         */
        String name;
        public Message(String name,long time,TimeUnit unit){
            this.name = name;
            this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }
    
        @Override
        public int compareTo(Delayed o) {
            Message item = (Message) o;
            long diff = this.time - item.time;
            if (diff <= 0){
                return  -1;
            }else{
             return 1;
            }
        }
    
        @Override
        public String toString() {
            return DelayQueueDemo.printDate() + "Message{" + "time=" + time + ", name=" + name + "/" + "}";
        }
    }
    
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author admin
     */
    public class DelayQueueDemo {
        public static void main(String[] args) throws InterruptedException {
            Message item1 = new Message("消息1",5, TimeUnit.SECONDS);
            Message item2 = new Message("消息2",10, TimeUnit.SECONDS);
            Message item3 = new Message("消息3",15, TimeUnit.SECONDS);
    
            DelayQueue<Message> queue  = new DelayQueue<Message>();
            queue.add(item1);
            queue.add(item2);
            queue.add(item3);
    
            int queueLengh = queue.size();
            System.out.println(printDate() + "开始!");
            for (int i = 0; i < queueLengh; i++) {
                Message take = queue.take();
                System.out.format(printDate() + " 消息出队,属性name=%s%n",take.name);
            }
    
            System.out.println(printDate() + "结束!");
        }
        static String printDate(){
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            return sdf.format(new Date());
        }
    }
    
    

    DelayQueue声明

    DelayQueue声明如下:

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E>
    

    从DelayQueue声明可以看出,DelayQueue中的元素必须是Delayed接口的子类。

    Delayed声明如下:

    public interface Delayed extends Comparable<Delayed> {
    	/**
    	*以给定的时间单位返回与此对象关联的剩余延迟
    	*/
        long getDelay(TimeUnit unit);
    }
    

    DelayQueue属性

    /**
    *可重入锁
    */
    private final transient ReentrantLock lock = new ReentrantLock();
    /**
    *缓存元素的优先级队列
    */
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    /**
    *特定的用于等待队列头中元素的线程
    *Leader-Follower模式的变体形式
    *用于最小化不必要的定时等待
    */
    private Thread leader = null;
    /**
    *当更新的元素在队列的开头变得可用时
    *或在新线程可能需要成为领导者时,会发出条件信号
    */
    private final Condition available = lock.newCondition();
    

    以上可以看出,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。

    DelayQueue构造器

    /**
    *默认构造器
    */
    public DelayQueue() {}
    /**
    *添加集合c中所有元素到队列中
    */
    public DelayQueue(Collection<? extends E> c) {
    	this.addAll(c);
    }
    

    DelayQueue入队

    /*
    *将指定元素插入此延时队列
    */
    public boolean add(E e) {
    	return offer(e);
    }
    /*
    *将指定元素插入此延时队列
    *由于队列是无界的,因此该方法将永远不会被阻塞
    */
    public void put(E e) {
    	offer(e);
    }
    /*
    *将指定元素插入此延时队列
    *由于队列是无界的,因此该方法将永远不会被阻塞
    */
    public boolean offer(E e, long timeout, TimeUnit unit) {
    	return offer(e);
    }
    

    以上几个方法都会调用offer()方法。

        public boolean offer(E e) {
        	//获取可重入锁
            final ReentrantLock lock = this.lock;
            //可重入锁加锁
            lock.lock();
            try {
            	//调用优先级队列的offer()方法入队
                q.offer(e);
                //如果入队元素在队首,则唤醒一个出队线程
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                //返回入队成功
                return true;
            } finally {
            	//解锁
                lock.unlock();
            }
        }
    

    leader是等待获取队列头元素的线程,应用主从式设计减少不必要的等待。如果leader不为空,表示已经有线程在等待获取队列的头元素。所以,通过await()方法让出当前线程等待信号。如果leader为空,则把当前线程设置为leader,当一个线程为leader,它使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。

    DelayQueue出队

    poll()方法

    	/*
    	*检索并删除次队列的头
    	*如果此队列没有延迟过期的元素,则返回null
    	*/
        public E poll() {
        	//获取可重入锁
            final ReentrantLock lock = this.lock;
            //可重入锁加锁
            lock.lock();
            try {
            	//检索但不删除队列头部元素
                E first = q.peek();
                //如果first为null或者返回与此对象关联的剩余延迟时间大于0
                //返回null
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                	//否则通过优先队列poll()方法出队
                    return q.poll();
            } finally {
            	//可重入锁解锁
                lock.unlock();
            }
        }
    

    take()方法

    	/*
    	*检索并除去此队列的头
    	*等待直到该队列上具有过期延迟的元素可用
    	*/
        public E take() throws InterruptedException {
        	//获取可重入锁
            final ReentrantLock lock = this.lock;
            //可重入锁加锁
            lock.lockInterruptibly();
            try {
                for (;;) {
                	//检索但不删除队列头部元素
                    E first = q.peek();
                    //如果first为空
                    if (first == null)
                    	//在available条件上等待
                        available.await();
                    else {
                    	//如果first非空
                    	//获取first的剩余延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                        //如果delay小于等于0
                        if (delay <= 0)
                        	//延迟时间到期,获取并删除头部元素
                            return q.poll();
                        //如果delay大于0,即延迟时间未到期
                        //将first置为null
                        first = null; 
                        //如果leader线程非空
                        if (leader != null)
                        	//当前线程无限期阻塞
                        	//等待leader线程唤醒
                            available.await();
                        else {
                        	//如果leader线程为空
                        	//获取当前线程
                            Thread thisThread = Thread.currentThread();
                            //是当前线程成为leader线程
                            leader = thisThread;
                            try {
                            	//当前线程等待剩余延迟时间
                                available.awaitNanos(delay);
                            } finally {
                            	//如果当前线程是leader线程
                            	//释放leader线程
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
            	//如果leader线程为null并且队列不为空
            	//说明没有其他线程在等待,那就通知条件队列
                if (leader == null && q.peek() != null)
                	//通过signal()方法唤醒一个出队线程
                    available.signal();
                //解锁
                lock.unlock();
            }
        }
    

    take()方法总结:

    1. 获取锁。
    2. 取出优先级队列q的首元素。
    3. 如果元素q的队首为空则阻塞。
    4. 如果元素q的队首(first)不为空;获取这个元素的delay时间值,如果first的延迟delay时间小于等于0,说明元素已经到了可以使用的时间,调用poll()方法弹出该元素,跳出方法。
    5. 如果first的延迟delay时间大于0,释放元素first的引用,避免内存泄漏。
    6. 如果延迟delay时间大于0,leader非空,当前线程等待。
    7. 如果延迟delay时间大于0,leader为空,将当前线程设置为leader线程,等待剩余时间。
    8. 自旋,循环以上操作,直到return。

    重载poll()方法

        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        	//获取等待时间
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    //如果first为空
                    if (first == null) {
                    	//如果nanos小于等于0
                        if (nanos <= 0)
                        	//返回null
                            return null;
                        else
                        	//如果nanos大于0
                        	//等待nanos时间
                            nanos = available.awaitNanos(nanos);
                    } else {
                    	//如果队首非空
                    	//获取first的剩余延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                       	//如果delay小于等于0
                        if (delay <= 0)
                        	//延迟时间到期,获取并删除头部元素
                            return q.poll();
                        //如果delay大于0
                        //如果nanos小于等于0
                        if (nanos <= 0)
                        	//返回null
                            return null;
                        //如果delay大于0且nanos大于0
                        //first置为null
                        first = null; 
                        //如果nanos小于delay或者leader非空
                        if (nanos < delay || leader != null)
                        	//等待delay时间
                            nanos = available.awaitNanos(nanos);
                        else {
                        	//如果nanos大于等于delay或者leader为空
                        	//获取当前线程
                            Thread thisThread = Thread.currentThread();
                            //设置当前线程为leader
                            leader = thisThread;
                            try {
                            	//等待delay时间
                                long timeLeft = available.awaitNanos(delay);
                                //修改nanos
                                nanos -= delay - timeLeft;
                            } finally {
                            	//如果当前线程为leader线程
                            	//释放leader线程
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
            	//如果leader为null并且队列不为空
            	//说明没有其他线程在等待,那就通知条件队列
                if (leader == null && q.peek() != null)
                	//通过singnal()方法唤醒一个出队线程
                    available.signal();
                //解锁
                lock.unlock();
            }
        }
    
    展开全文
  • delayqueue

    2018-02-11 15:55:00
    delayqueue DelayedQueue是一个用来延时处理的队列。可以为队列中的元素添加一个过期的时间, 队列中的元素 要实现 delay接口中的两个方法 ​存入队列:add offer put 从队列中获取:peek poll ...

    DelayedQueue是一个用来延时处理的队列。可以为队列中的元素添加一个过期的时间,

    队列中的元素 要实现 delay接口中的两个方法

    ​存入队列:add offer put

    从队列中获取:peek poll take​

    peek:获取队列的head对象,但不是从队列中移除。如果队列空,就返回空

    poll :​获取并移出队列head对象。如果head没有超时,返回空

    poll w/ timeout参数 : ​获取并移出队列head对象。如果没有超时head对象,会wait当前线程知道有超时对象,或者按照超时参数设定,返回空

    take : ​获取并移出队列head对象。如果没有超时head对象,会wait当前线程知道有对象满足超时条件

    ​DelayedQueue实现保证了最快过期的对象排在head位置,也就是说它会在每次peek时候返回最快超时的对象。

    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) throws InterruptedException {
            DelayQueue<Person> delayQueue =  new DelayQueue<>();
            delayQueue.clear();
            Calendar calendar = Calendar.getInstance();
            for (int i=1;i<11;i++){
                calendar.add(Calendar.SECOND,i*10);
                Person m = new Person(i+" is id",i+" is name",System.currentTimeMillis()+i*1000,calendar.getTime());
                delayQueue.add(m);
            }
            Iterator<Person> iterator = delayQueue.iterator();
            while(iterator.hasNext()){
                Person p=iterator.next();
                System.out.println(p);
            }
            while(!delayQueue.isEmpty()){
                Person message = delayQueue.take();//阻塞方法
                System.out.println(message);
            }
     
        }
    }
    
    class Person implements Delayed{
        private String id;
        private String name;
        private long insertTime;
        private Date outTime;
        
        public Person(String id, String name,long insertTime,Date outTime) {
            super();
            this.id = id;
            this.name = name;
            this.insertTime = insertTime;
            this.outTime = outTime;
        }
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public long getInsertTime() {
            return insertTime;
        }
        public void setInsertTime(long insertTime) {
            this.insertTime = insertTime;
        }
        public Date getOutTime() {
            return outTime;
        }
        public void setOutTime(Date outTime) {
            this.outTime = outTime;
        }
        @Override
        public String toString() {
            return "Person [id=" + id + ", name=" + name + ", outTime=" + new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(outTime) + "]";
        }
        @Override
        public int compareTo(Delayed o) {
            Person person = (Person) o;
            
            if(this.insertTime > person.insertTime){
                return 1;
            }else if(this.insertTime == person.insertTime){
                return 0;
            }else{
                return -1;
            }
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return this.outTime.getTime()-System.currentTimeMillis();
        }
        
    }

     

    posted on 2018-02-11 15:55 会跳舞的笨笨熊 阅读(...) 评论(...) 编辑 收藏

    转载于:https://www.cnblogs.com/ITguoguo/p/8442247.html

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,751
精华内容 700
关键字:

delayqueue