精华内容
下载资源
问答
  • 【20】阻塞队列原理

    2021-03-29 12:07:28
    (1)一个人只要自己不放弃自己,整个世界也不会放弃你. (2)天生我才必有大用 (3)不能忍受学习之苦就一定要忍受...阻塞队列原理 1.阻塞队列 1.1队列 (1)队列是一种数据结构,它有什么特征? 它是一种先进先出的.

    (1)一个人只要自己不放弃自己,整个世界也不会放弃你.
    (2)天生我才必有大用
    (3)不能忍受学习之苦就一定要忍受生活之苦,这是多么痛苦而深刻的领悟.
    (4)做难事必有所得
    (5)精神乃真正的刀锋
    (6)战胜对手有两次,第一次在内心中.
    (7)好好活就是做有意义的事情.
    (8)亡羊补牢,为时未晚
    (9)科技领域,没有捷径与投机取巧。
    (10)编写实属不易,若喜欢或者对你有帮助记得点赞+关注或者收藏哦~

    阻塞队列原理

    1.阻塞队列

    1.1队列

    (1)队列是一种数据结构,它有什么特征?

    • 它是一种先进先出的数据结构。

    1.2阻塞队列

    (1)管道头已经被堵住了,造成阻塞

    (2)管道被填满了,造成阻塞

    在这里插入图片描述

    (1)当队列里面元素放满的时候,再往队列中存放元素的动作会造成阻塞。

    (2)当队列里面的元素是空的时候,想从队列中拿元素的动作会造成阻塞。

    1.3JDK中专门的接口

    (1)BlockingDeque

    public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    

    在这里插入图片描述

    (2)真正体现阻塞的方法为put与take,如果放入元素时队列已满,线程就会阻塞。如果取元素时队列已空,线程也会阻塞。

    (3)BlockingDeque也有非阻塞方法

    • add与remove,如果放入元素时队列已满,抛出异常,如果从队列取出元素时,队列已空,也抛出异常。
    • offer与poll,poll如果放入元素时队列已满,抛出异常,如果从队列取出元素时,队列已空,返回null。

    1.4阻塞队列在并发编程中用来做什么?

    (1)用来解决生产者与消费者问题

    (2)如果生产者生产速度很快,而消费者消费得很慢,生产者就需要等消费者把产品消费完了,才生产新的产品让消费者继续消费。

    (3)如果消费者消费的速度要比生产者生产的速度要快,消费者要等待生产者生产出产品后再消费。

    (4)这种情况下,为了平衡生产者与消费者之间能力上的不均衡的情况,则在生产者与消费者之间放入一个容器,生产者与消费者需要的产品,统一放在一个容器里面。

    (5)生产者生产一个产品就往容器里面放一个,消费者消费一个就从容器里取出一个,这种在生产者与消费者之间引入一个中间容器的方式,就可以很好的平衡消费者与生产者之间一个性能不均衡的问题,让生产者与消费者实现解藕。

    1.4.1生产者与消费者模式

    1.4.2常用方法

    1.4.3常用阻塞队列

    在这里插入图片描述

    (1)BlockingDeque已经为我们提供了很多实现。

    (2)什么叫做有界,什么叫做无界?

    • 有界,即队列的长度是有限的,满了以后生产者就会阻塞。
    • 无界,可以不停的往队列里面放东西,而不会被阻塞。插入不阻塞,拿取还是会阻塞的,没有数据的时候也会阻塞。

    (3)阻塞往队列里面插入元素可以阻塞,从队列里面取出元素也可以阻塞。

    1.4.4队列案例

    /**
     *类说明:存放的队列的元素,
     */
    public class ItemVo<T> implements Delayed {
    
    	//到期时间,但传入的数值代表过期的时长,传入单位毫秒
    	private long activeTime;
    	private T data;//业务数据,泛型
    
    	//传入过期时长,单位秒,内部转换
    	public ItemVo(long expirationTime, T data) {
    		this.activeTime = expirationTime*1000+System.currentTimeMillis();
    		this.data = data;
    	}
    
    	public long getActiveTime() {
    		return activeTime;
    	}
    
    	public T getData() {
    		return data;
    	}
    
    	/*
    	 * 这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
    	 */
    	public long getDelay(TimeUnit unit) {
    		long d = unit.convert(this.activeTime
    				-System.currentTimeMillis(),unit);
    		return d;
    	}
    
    	/*
    	 *Delayed接口继承了Comparable接口,按剩余时间排序,实际计算考虑精度为纳秒数
    	 */
    	public int compareTo(Delayed o) {
    		long d = (getDelay(TimeUnit.MILLISECONDS)
    				- o.getDelay(TimeUnit.MILLISECONDS));
    		if (d==0){
    			return 0;
    		}else{
    			if (d<0){
    				return -1;
    			}else{
    				return  1;
    			}
    		}
    	}
    
    }
    
    /**
     *类说明:订单的实体类
     */
    public class Order {
    	private final String orderNo;//订单的编号
    	private final double orderMoney;//订单的金额
    	
    	public Order(String orderNo, double orderMoney) {
    		super();
    		this.orderNo = orderNo;
    		this.orderMoney = orderMoney;
    	}
    
    	public String getOrderNo() {
    		return orderNo;
    	}
    
    	public double getOrderMoney() {
    		return orderMoney;
    	}	
    }
    
    
    /**
     *类说明:将订单推入队列
     */
    public class PutOrder implements Runnable {
        private DelayQueue<ItemVo<Order>> queue;
    
        public PutOrder(DelayQueue<ItemVo<Order>> queue){
            this.queue = queue;
        }
    
    	@Override
    	public void run() {
    		//5秒后到期
    		Order orderTb = new Order("Tb12345",366);
    		ItemVo<Order> itemTb = new ItemVo<Order>(5,orderTb);
    		queue.offer(itemTb);
    		System.out.println("订单5秒后超时:"+orderTb.getOrderNo()+";"
                    +orderTb.getOrderMoney());
    		//8秒后到期
    		Order orderJd = new Order("Jd54321",366);
    		ItemVo<Order> itemJd = new ItemVo<Order>(8,orderJd);
    		queue.offer(itemJd);
    		System.out.println("订单8秒后超时:"+orderJd.getOrderNo()+";"
                    +orderJd.getOrderMoney());
    
    	}	
    }
    
    
    /**
     *类说明:取出到期的订单的功能
     */
    public class FetchOrder implements Runnable {
    	private DelayQueue<ItemVo<Order>> queue;
    	
        public FetchOrder(DelayQueue<ItemVo<Order>> queue){
            this.queue = queue;
        }
    
    	@Override
    	public void run() {
    		while(true) {
    			try {
    				ItemVo<Order> item = queue.take();
    				Order order = (Order)item.getData();
    				System.out.println("Get From Queue:"+"data="
    				+order.getOrderNo()+";"+order.getOrderMoney());
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		
    	}	
    }
    

    1.4.5延时队列的使用

    /**
     *类说明:延时队列测试程序
     */
    public class Test {
        public static void main(String[] args) throws InterruptedException {
            DelayQueue<ItemVo<Order>> queue = new DelayQueue<ItemVo<Order>>();//延时队列
    
            new Thread(new PutOrder(queue)).start();
            new Thread(new FetchOrder(queue)).start();
    
            //每隔500毫秒,打印个数字
            for(int i=1;i<15;i++){
                Thread.sleep(500);
                System.out.println(i*500);
            }
        }
    }
    
    展开全文
  • Java 阻塞队列原理

    2021-02-15 19:28:02
    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景...

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

    阻塞队列提供了四种处理方法:

    方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
    插入方法 add(e) offer(e) put(e) offer(e,time,unit)
    移除方法 remove() poll() take() poll(time,unit)
    检查方法 element() peek() 不可用 不可用

     

    1、初识阻塞队列

    在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

    BlockingQueue的核心方法:

    public interface BlockingQueue<E> extends Queue<E> {
    
        //将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
        boolean add(E e);
    
        //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
        boolean offer(E e);
    
        //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
        void put(E e) throws InterruptedException;
    
        //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
        E take() throws InterruptedException;
    
        //在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //获取队列中剩余的空间。
        int remainingCapacity();
    
        //从队列中移除指定的值。
        boolean remove(Object o);
    
        //判断队列中是否拥有该值。
        public boolean contains(Object o);
    
        //将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c);
    
        //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    在深入之前先了解下下ReentrantLock 和 Condition:
    重入锁ReentrantLock:
    ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
    ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
    主要方法:

    • lock()获得锁
    • lockInterruptibly()获得锁,但优先响应中断
    • tryLock()尝试获得锁,成功返回true,否则false,该方法不等待,立即返回
    • tryLock(long time,TimeUnit unit)在给定时间内尝试获得锁
    • unlock()释放锁

    Condition:await()、signal()方法分别对应之前的Object的wait()和notify()

    • 和重入锁一起使用
    • await()是当前线程等待同时释放锁
    • awaitUninterruptibly()不会在等待过程中响应中断
    • signal()用于唤醒一个在等待的线程,还有对应的singalAll()方法

     

    2、阻塞队列的成员
    队列 有界性 数据结构
    ArrayBlockingQueue bounded(有界) 加锁 arrayList
    LinkedBlockingQueue optionally-bounded 加锁 linkedList
    PriorityBlockingQueue unbounded 加锁 heap
    DelayQueue unbounded 加锁 heap
    SynchronousQueue bounded 加锁
    LinkedTransferQueue unbounded 加锁 heap
    LinkedBlockingDeque unbounded 无锁 heap

     

    • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

    • LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
    • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
    • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。DelayQueue可以运用在以下应用场景:
      • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
      • 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
    • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
    • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
    • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

     

    3、阻塞队列原理以及使用

    ArrayBlockingQueue

    // 存储队列元素的数组
        final Object[] items;
    
        // 拿数据的索引,用于take,poll,peek,remove方法
        int takeIndex;
    
        // 放数据的索引,用于put,offer,add方法
        int putIndex;
    
        // 元素个数
        int count;
    
        // 可重入锁
        final ReentrantLock lock;
        // notEmpty条件对象,由lock创建
        private final Condition notEmpty;
        // notFull条件对象,由lock创建
        private final Condition notFull;
    
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);//默认构造非公平锁的阻塞队列 
        }
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            //初始化ReentrantLock重入锁,出队入队拥有这同一个锁 
            lock = new ReentrantLock(fair);
            //初始化非空等待队列
            notEmpty = lock.newCondition();
            //初始化非满等待队列 
            notFull =  lock.newCondition();
        }
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                //将集合添加进数组构成的队列中 
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }

    添加的实现原理:

    这里的add方法和offer方法最终调用的是enqueue(E x)方法,其方法内部通过putIndex索引直接将元素添加到数组items中,这里可能会疑惑的是当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0,这是因为当前队列执行元素获取时总是从队列头部获取,而添加元素从中从队列尾部获取所以当队列索引(从0开始)与数组长度相等时,下次我们就需要从数组头部开始添加了,如下图演示

        //入队操作
        private void enqueue(E x) {
            final Object[] items = this.items;
            //通过putIndex索引对数组进行赋值
            items[putIndex] = x;
            //索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }

    put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。到此我们对三个添加方法即put,offer,add都分析完毕,其中offer,add在正常情况下都是无阻塞的添加,而put方法是阻塞添加。这就是阻塞队列的添加过程。说白了就是当队列满时通过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况,一是,队列已满,那么新到来的put线程将添加到notFull的条件队列中等待,二是,有移除线程执行移除操作,移除成功同时唤醒put线程,如下图所示

        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //当队列元素个数与数组长度相等时,无法添加元素
                while (count == items.length)
                    //将当前调用线程挂起,添加到notFull条件队列中等待唤醒
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }

    take方法其实很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程,执行take操作。图示如下

        //从队列头部删除,队列没有元素就阻塞,可中断
         public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//中断
              try {
                  //如果队列没有元素
                  while (count == 0)
                      //执行阻塞操作
                      notEmpty.await();
                  return dequeue();//如果队列有元素执行删除操作
              } finally {
                  lock.unlock();
              }
            }

     

    展开全文
  • JAVA 阻塞队列原理 阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况: 1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。 ...

    JAVA 阻塞队列原理

    阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:
    1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
    2. 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    阻塞队列的主要方法
     
    抛出异常:抛出一个异常;
    特殊值:返回一个特殊值(null 或 false,视情况而定)
    阻塞:在成功操作之前,一直阻塞线程
    超时:放弃前只在最大的时间内阻塞
     
    1:public abstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是 NULL,则会抛出 NullPointerException 异常。
    2:public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
    3:public abstract void put(E paramE) throws InterruptedException: 将指定元素插入此队列中,将等待可用的空间(如果有必要)
    public void put(E paramE) throws InterruptedException {
        checkNotNull(paramE);
        ReentrantLock localReentrantLock = this.lock;
        localReentrantLock.lockInterruptibly();
        try {
            while (this.count == this.items.length)
            this.notFull.await();//如果队列满了,则线程阻塞等待
            enqueue(paramE);
            localReentrantLock.unlock();
            } finally {
                localReentrantLock.unlock();
        }
    }
    4:offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。
     
    获取数据操作:
    1:poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;
    2:poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
    3:take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。
    4.drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
     
    Java 中的阻塞队列
    1. ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
    2. LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
    3. PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
    4. DelayQueue:使用优先级队列实现的无界阻塞队列。
    5. SynchronousQueue:不存储元素的阻塞队列。
    6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。
    7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列
    ArrayBlockingQueue(公平、非公平)
    用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐
    量。我们可以使用以下代码创建一个公平的阻塞队列:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
     
    LinkedBlockingQueue(两个独立锁提高并发)
    基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
     
    PriorityBlockingQueue(compareTo 排序实现优先
    是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
     
    DelayQueue(缓存失效、定时任务 )
    是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:
    1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
    2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。
     
    SynchronousQueue(不存储数据、可用于传递数据)
    是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和
    ArrayBlockingQueue。
     
    LinkedTransferQueue
    是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 ,LinkedTransferQueue 多了 tryTransfer transfer 方法。
    1. transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回
    2. tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。
     
    LinkedBlockingDeque
    是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队
    列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。
    展开全文
  • 本篇内容将对于阻塞队列原理、4中处理方式以及7中阻塞队列进行详细解析。 什么是阻塞队列 首先,再一次申明,队列必须是线程安全的,否则将毫无意义。阻塞队列最大的特征就是提供两种阻塞操作: 阻塞的插入元素:...

    前言

    在前文中非阻塞队列之ConcurrentLinkedQueue源码解析中,深度解析了非阻塞队列的源码。本篇内容将对于阻塞队列的原理、4中处理方式以及7中阻塞队列进行详细解析。

    什么是阻塞队列

    首先,再一次申明,队列必须是线程安全的,否则将毫无意义。阻塞队列最大的特征就是提供两种阻塞操作:

    • 阻塞的插入元素:当队列满时,队列会阻塞插入元素的线程,直到队列非满;
    • 阻塞的获取元素:对队列空时,队列会阻塞获取元素的线程,直到队列非空。

    说到这里,其实要研究Java中阻塞队列的核心问题就付出水面了:

    • 阻塞队列如何实现阻塞操作的?
    • 如何在达到一定条件时唤醒相关线程的?
    • 如何保证线程安全的插入元素和获取元素?
      其实这就回到了并发要解决的本质。在Java并发——线程安全一文中对线程安全和如何实现线程安全有非常清晰的阐述。
      要实现以上几种功能的方案有很多:采用Object.wait/notify或者基于AQS。Java中大多数的阻塞队列采用的基于AQS实现的ReentrantLock和Condition的方式实现线程安全的。阻塞队列的源码比起非阻塞队列的源码要简单很多,如果对于这些基本理念很熟悉的话,那么理解Java阻塞队列的源码就很简单了。

    4种处理方式

    方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
    插入方法 add(e) offer(e) put(e) offer(e, time, unit)
    移除方法 remove() pull() take() take(time, unit)
    检查方法 element() peek() - -
    • 抛出异常:当队列满时,再添加元素的话将抛出IllegalStateException(“Queue full”)异常;当队列空时,在移除元素的话将抛出NoSuchElementException异常。
    • 返回特殊值:添加元素方法会返回boolean值表示添加成功与否,如果返回true表示添加成功,如果队列满了,同理,如果移除元素成功也将返回false。
    • 一直阻塞:当队列满时,队列会阻塞所有添加元素的线程,直到线程非满;当队列空时,队列会阻塞所有移除元素的线程,直到线程非空。
    • 超时退出:当阻塞超过一段时间之后,线程会自动退出。

    注意,阻塞队列分为有界阻塞队列和无界阻塞队列,对于无界阻塞队列而言,永远不会出现队列满的情况,因此put/offer/take/pull这些方法不会出现阻塞的情况。当然无界并不意味着可以存放无限的元素,毕竟JVM内存是有界的!

    在实际开发中,这四种处理方式改如何选择呢?
    抛出异常:这种方式适用于“一次性”场景,比如中奖活动,规定只能有10名用户中奖,那么队列满之后,将直接抛出异常拒绝再添加中奖用户中队列中,然后触发派奖线程,派奖线程从队列中获取元素直到全部获取完毕抛出异常结束派奖。
    返回特殊值:这种场景适用于高并发、耗时短的任务。由于任务执行耗时短,当添加或者移除失败时,可以采用自旋思想,自旋添加或者移除直到成功,这样做的好处是避免了线程调度的性能消耗。
    一直阻塞:这种场景适用于高并发、耗时长的任务。由于耗时长,此时再采用自旋的方式显然不如阻塞线程。
    超时退出:这种场景适用于高并发且允许操作失败的场景,比如用户行为收集等,虽然无法保证100%的收集,但是在大量数据下90%以上的收集率足够得到准确的数据分析结果了。相当于牺牲了一定的准确率以提升性能。

    7种阻塞队列

    ArrayBlockingQueue

    基于数组实现的有界队列,FIFO。内部使用的是ReentrantLock + ConditionObject实现的同步机制。支持线程公平的访问队列(本质上是设置ReentrantLock的公平锁)

    LinkedBlockingQueue

    基于链表实现的有界队列,FIFO。内部使用的是ReentrantLock + ConditionObject实现的同步机制。但是它不支持设置公平锁。

    PriorityBlockingQueue

    是一个支持优先级的无界阻塞队列。默认情况下是按照元素添加的顺序升序排序的。也可以自定义类实现compareTo()方法来确定元素的排序规则。内部使用的是ReentrantLock + ConditionObject实现的同步机制。既然都已经支持优先级了,那么自然不需要公平竞争咯。

    DelayQueue

    延时队列。内部实际上是基于PriorityQueue实现的。队列中的元素必须实现Delayed接口,在创建元素时可以指定延时多久才能从队列中获取到当前元素。
    DelayQueue非常有用!我们可以基于DelayQueue实现以下场景:

    • 缓存系统的设计:循环从延时队列中获取元素,如果能够获取到元素,说明这个元素的有效期到了;
    • 定时任务系统的设计:循环从延时队列中获取元素,一定获取到元素就执行相关的定时任务逻辑。在Java中,TimeQueue就是基于DelayQueue实现的。

    Delayed接口的具体使用可以参考Java定时任务框架ScheduledThreadPoolExecutor中的ScheduledFutureTask。以后有机会可以进行定时任务系统专题研究。

    SynchronousQueue

    这是一个不存储元素的队列,需要注意的是每一个put操作都必须有对应的take操作,否则将会被阻塞不能够继续添加元素。这个队列可以看做是容量只有1的队列,非常适合一些传递性场景。它也是基于ReentrantLock和ConditionObject实现的。

    LinkedTransferQueue

    基于链表的无界阻塞队列,FIFO。相比于其他阻塞队列,它的特性就在于“transfer”。

    • transfer方法
      如果有消费端正在等待接收元素(take()/poll()方法),transfer方法可以将元素立即传递给消费端。如果此时没有消费端,则transfer方法会将此元素放在队列的tail节点,并且阻塞直到此元素被消费。
    • tryTransfer方法
      与transfer方法不同,此方法的目的是为了试探元素能否直接被消费端接收。如果没有消费端正在等待接收元素,此方法返回false。和transfer()方法不同,此方法会立即返回。

    LinkedBlockingDeque

    基于双向链表的阻塞队列。相比于其他阻塞队列,他的特性就在于“双向”。即:可以从队列的两端插入和移除元素。这样就相当于减少了一半的锁竞争,进一步提升了并发能力。LinkedBlockingDeque非常适用于高并发场景以及“工作窃取”模式中。

    总结

    本篇文章主要记录我在学习阻塞队列时的历程和一些心得。通过本篇文章,我们知道了阻塞队列的内部数据结构以及不同阻塞队列的特点。在实际工作中我们可以根据实际情况来选择合适的队列让程序更加合理。

    架构师之美

    展开全文
  • 阻塞队列是一个先进先出的单向队列(Queue),而BlockingQueue阻塞队列实际是非阻塞队列的一个加强,之所以是阻塞队列最主要的是实现take和put,当阻塞队列满时,put会一直阻塞直到拿到数据,或者响应中断退出;...
  • 阻塞队列原理详解

    千次阅读 2017-03-03 16:48:46
    前一篇关于阻塞队列原理和使用介绍的很生硬且笼统,因为自己是菜鸟经验少,完全是自己一个人的使用和总结。故而再此附上一篇介绍的很详细精彩的文章,希望能对大家有所帮助吧!!!
  • 基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。 LinkedBlockingQueue: ...
  • 1.redis介绍 Redis是一个开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、Key-...使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时...
  • BlockingQueue为阻塞队列,其与普通队列不同的是,以put和get为例,阻塞队列在put时,若列表满了,则会等待直到队列可以加入元素;而阻塞队列在get时候,若列表为空,则会等待到队列非空。可以用来解决 生产者消费者...
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。 DelayQueue:一个使用优先级队列实现的无界阻塞队列。 SynchronousQueue:一个不存储元素的阻塞队列。 LinkedTransferQueue:一个由链表结构组成的...
  • 【JAVA多线程19】JAVA 阻塞队列原理

    千次阅读 2019-06-13 00:39:08
    今天我们来讨论另外一类容器:阻塞队列。  非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。  使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,842
精华内容 736
关键字:

阻塞队列原理