精华内容
下载资源
问答
  • Java 阻塞队列原理

    2021-02-15 19:28:02
    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景...

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

    阻塞队列提供了四种处理方法:

    方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
    插入方法 add(e) offer(e) put(e) offer(e,time,unit)
    移除方法 remove() poll() take() poll(time,unit)
    检查方法 element() peek() 不可用 不可用

     

    1、初识阻塞队列

    在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

    BlockingQueue的核心方法:

    public interface BlockingQueue<E> extends Queue<E> {
    
        //将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
        boolean add(E e);
    
        //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
        boolean offer(E e);
    
        //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
        void put(E e) throws InterruptedException;
    
        //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
        E take() throws InterruptedException;
    
        //在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //获取队列中剩余的空间。
        int remainingCapacity();
    
        //从队列中移除指定的值。
        boolean remove(Object o);
    
        //判断队列中是否拥有该值。
        public boolean contains(Object o);
    
        //将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c);
    
        //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    在深入之前先了解下下ReentrantLock 和 Condition:
    重入锁ReentrantLock:
    ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
    ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
    主要方法:

    • lock()获得锁
    • lockInterruptibly()获得锁,但优先响应中断
    • tryLock()尝试获得锁,成功返回true,否则false,该方法不等待,立即返回
    • tryLock(long time,TimeUnit unit)在给定时间内尝试获得锁
    • unlock()释放锁

    Condition:await()、signal()方法分别对应之前的Object的wait()和notify()

    • 和重入锁一起使用
    • await()是当前线程等待同时释放锁
    • awaitUninterruptibly()不会在等待过程中响应中断
    • signal()用于唤醒一个在等待的线程,还有对应的singalAll()方法

     

    2、阻塞队列的成员
    队列 有界性 数据结构
    ArrayBlockingQueue bounded(有界) 加锁 arrayList
    LinkedBlockingQueue optionally-bounded 加锁 linkedList
    PriorityBlockingQueue unbounded 加锁 heap
    DelayQueue unbounded 加锁 heap
    SynchronousQueue bounded 加锁
    LinkedTransferQueue unbounded 加锁 heap
    LinkedBlockingDeque unbounded 无锁 heap

     

    • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

    • LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
    • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
    • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。DelayQueue可以运用在以下应用场景:
      • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
      • 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
    • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
    • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
    • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

     

    3、阻塞队列原理以及使用

    ArrayBlockingQueue

    // 存储队列元素的数组
        final Object[] items;
    
        // 拿数据的索引,用于take,poll,peek,remove方法
        int takeIndex;
    
        // 放数据的索引,用于put,offer,add方法
        int putIndex;
    
        // 元素个数
        int count;
    
        // 可重入锁
        final ReentrantLock lock;
        // notEmpty条件对象,由lock创建
        private final Condition notEmpty;
        // notFull条件对象,由lock创建
        private final Condition notFull;
    
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);//默认构造非公平锁的阻塞队列 
        }
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            //初始化ReentrantLock重入锁,出队入队拥有这同一个锁 
            lock = new ReentrantLock(fair);
            //初始化非空等待队列
            notEmpty = lock.newCondition();
            //初始化非满等待队列 
            notFull =  lock.newCondition();
        }
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                //将集合添加进数组构成的队列中 
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }

    添加的实现原理:

    这里的add方法和offer方法最终调用的是enqueue(E x)方法,其方法内部通过putIndex索引直接将元素添加到数组items中,这里可能会疑惑的是当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0,这是因为当前队列执行元素获取时总是从队列头部获取,而添加元素从中从队列尾部获取所以当队列索引(从0开始)与数组长度相等时,下次我们就需要从数组头部开始添加了,如下图演示

        //入队操作
        private void enqueue(E x) {
            final Object[] items = this.items;
            //通过putIndex索引对数组进行赋值
            items[putIndex] = x;
            //索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }

    put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。到此我们对三个添加方法即put,offer,add都分析完毕,其中offer,add在正常情况下都是无阻塞的添加,而put方法是阻塞添加。这就是阻塞队列的添加过程。说白了就是当队列满时通过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况,一是,队列已满,那么新到来的put线程将添加到notFull的条件队列中等待,二是,有移除线程执行移除操作,移除成功同时唤醒put线程,如下图所示

        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //当队列元素个数与数组长度相等时,无法添加元素
                while (count == items.length)
                    //将当前调用线程挂起,添加到notFull条件队列中等待唤醒
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }

    take方法其实很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程,执行take操作。图示如下

        //从队列头部删除,队列没有元素就阻塞,可中断
         public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//中断
              try {
                  //如果队列没有元素
                  while (count == 0)
                      //执行阻塞操作
                      notEmpty.await();
                  return dequeue();//如果队列有元素执行删除操作
              } finally {
                  lock.unlock();
              }
            }

     

    展开全文
  • JAVA 阻塞队列原理 阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况: 1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。 ...

    JAVA 阻塞队列原理

    阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:
    1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
    2. 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    阻塞队列的主要方法
     
    抛出异常:抛出一个异常;
    特殊值:返回一个特殊值(null 或 false,视情况而定)
    阻塞:在成功操作之前,一直阻塞线程
    超时:放弃前只在最大的时间内阻塞
     
    1:public abstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是 NULL,则会抛出 NullPointerException 异常。
    2:public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
    3:public abstract void put(E paramE) throws InterruptedException: 将指定元素插入此队列中,将等待可用的空间(如果有必要)
    public void put(E paramE) throws InterruptedException {
        checkNotNull(paramE);
        ReentrantLock localReentrantLock = this.lock;
        localReentrantLock.lockInterruptibly();
        try {
            while (this.count == this.items.length)
            this.notFull.await();//如果队列满了,则线程阻塞等待
            enqueue(paramE);
            localReentrantLock.unlock();
            } finally {
                localReentrantLock.unlock();
        }
    }
    4:offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。
     
    获取数据操作:
    1:poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;
    2:poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
    3:take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。
    4.drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
     
    Java 中的阻塞队列
    1. ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
    2. LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
    3. PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
    4. DelayQueue:使用优先级队列实现的无界阻塞队列。
    5. SynchronousQueue:不存储元素的阻塞队列。
    6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。
    7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列
    ArrayBlockingQueue(公平、非公平)
    用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐
    量。我们可以使用以下代码创建一个公平的阻塞队列:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
     
    LinkedBlockingQueue(两个独立锁提高并发)
    基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
     
    PriorityBlockingQueue(compareTo 排序实现优先
    是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
     
    DelayQueue(缓存失效、定时任务 )
    是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:
    1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
    2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。
     
    SynchronousQueue(不存储数据、可用于传递数据)
    是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和
    ArrayBlockingQueue。
     
    LinkedTransferQueue
    是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 ,LinkedTransferQueue 多了 tryTransfer transfer 方法。
    1. transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回
    2. tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。
     
    LinkedBlockingDeque
    是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队
    列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。
    展开全文
  • JAVA多线程19】JAVA 阻塞队列原理

    千次阅读 2019-06-13 00:39:08
    今天我们来讨论另外一类容器:阻塞队列。  非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。  使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,...

    今天我们来讨论另外一类容器:阻塞队列。

      非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。

      使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

      本文先讲述一下java.util.concurrent包下提供主要的几种阻塞队列,然后分析了阻塞队列和非阻塞队列的中的各个方法,接着分析了阻塞队列的实现原理,最后给出了一个实际例子和几个使用场景。

      一.几种主要的阻塞队列

      二.阻塞队列中的方法 VS 非阻塞队列中的方法

      三.阻塞队列的实现原理

      四.示例和使用场景

      若有不正之处请多多谅解,并欢迎批评指正。

      请尊重作者劳动成果,转载请标明原文链接:

       http://www.cnblogs.com/dolphin0520/p/3932906.html

    一.几种主要的阻塞队列

      自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

      ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

      LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

      PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

      DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    二.阻塞队列中的方法 VS 非阻塞队列中的方法

    1.非阻塞队列中的几个主要方法:

      add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;

      remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;

      offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;

      poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;

      peek():获取队首元素,若成功,则返回队首元素;否则返回null

     

      对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

    2.阻塞队列中的几个主要方法:

      阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法:

      put(E e)

      take()

      offer(E e,long timeout, TimeUnit unit)

      poll(long timeout, TimeUnit unit)

      

      put方法用来向队尾存入元素,如果队列满,则等待;

      take方法用来从队首取元素,如果队列为空,则等待;

      offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

      poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

    三.阻塞队列的实现原理

      前面谈到了非阻塞队列和阻塞队列中常用的方法,下面来探讨阻塞队列的实现原理,本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

      首先看一下ArrayBlockingQueue类中的几个成员变量:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>

    implements BlockingQueue<E>, java.io.Serializable {

     

    private static final long serialVersionUID = -817911632652898426L;

     

    /** The queued items  */

    private final E[] items;

    /** items index for next take, poll or remove */

    private int takeIndex;

    /** items index for next put, offer, or add. */

    private int putIndex;

    /** Number of items in the queue */

    private int count;

     

    /*

    * Concurrency control uses the classic two-condition algorithm

    * found in any textbook.

    */

     

    /** Main lock guarding all access */

    private final ReentrantLock lock;

    /** Condition for waiting takes */

    private final Condition notEmpty;

    /** Condition for waiting puts */

    private final Condition notFull;

    }

       可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。

      lock是一个可重入锁,notEmpty和notFull是等待条件。

      下面看一下ArrayBlockingQueue的构造器,构造器有三个重载版本:

    1

    2

    3

    4

    5

    6

    7

    8

    public ArrayBlockingQueue(int capacity) {

    }

    public ArrayBlockingQueue(int capacity, boolean fair) {

     

    }

    public ArrayBlockingQueue(int capacity, boolean fair,

                              Collection<? extends E> c) {

    }

       第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。

      然后看它的两个关键方法的实现:put()和take():

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    public void put(E e) throws InterruptedException {

        if (e == nullthrow new NullPointerException();

        final E[] items = this.items;

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            try {

                while (count == items.length)

                    notFull.await();

            catch (InterruptedException ie) {

                notFull.signal(); // propagate to non-interrupted thread

                throw ie;

            }

            insert(e);

        finally {

            lock.unlock();

        }

    }

       从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。

      当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。

      我们看一下insert方法的实现:

    1

    2

    3

    4

    5

    6

    private void insert(E x) {

        items[putIndex] = x;

        putIndex = inc(putIndex);

        ++count;

        notEmpty.signal();

    }

       它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。

      下面是take()方法的实现:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    public E take() throws InterruptedException {

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            try {

                while (count == 0)

                    notEmpty.await();

            catch (InterruptedException ie) {

                notEmpty.signal(); // propagate to non-interrupted thread

                throw ie;

            }

            E x = extract();

            return x;

        finally {

            lock.unlock();

        }

    }

       跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    private E extract() {

        final E[] items = this.items;

        E x = items[takeIndex];

        items[takeIndex] = null;

        takeIndex = inc(takeIndex);

        --count;

        notFull.signal();

        return x;

    }

       跟insert方法也很类似。

      其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。

    四.示例和使用场景

      下面先使用Object.wait()和Object.notify()、非阻塞队列实现生产者-消费者模式:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    public class Test {

        private int queueSize = 10;

        private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);

         

        public static void main(String[] args)  {

            Test test = new Test();

            Producer producer = test.new Producer();

            Consumer consumer = test.new Consumer();

             

            producer.start();

            consumer.start();

        }

         

        class Consumer extends Thread{

             

            @Override

            public void run() {

                consume();

            }

             

            private void consume() {

                while(true){

                    synchronized (queue) {

                        while(queue.size() == 0){

                            try {

                                System.out.println("队列空,等待数据");

                                queue.wait();

                            catch (InterruptedException e) {

                                e.printStackTrace();

                                queue.notify();

                            }

                        }

                        queue.poll();          //每次移走队首元素

                        queue.notify();

                        System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");

                    }

                }

            }

        }

         

        class Producer extends Thread{

             

            @Override

            public void run() {

                produce();

            }

             

            private void produce() {

                while(true){

                    synchronized (queue) {

                        while(queue.size() == queueSize){

                            try {

                                System.out.println("队列满,等待有空余空间");

                                queue.wait();

                            catch (InterruptedException e) {

                                e.printStackTrace();

                                queue.notify();

                            }

                        }

                        queue.offer(1);        //每次插入一个元素

                        queue.notify();

                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));

                    }

                }

            }

        }

    }

       这个是经典的生产者-消费者模式,通过阻塞队列和Object.wait()和Object.notify()实现,wait()和notify()主要用来实现线程间通信。

      具体的线程间通信方式(wait和notify的使用)在后续问章中会讲述到。

      下面是使用阻塞队列实现的生产者-消费者模式:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    public class Test {

        private int queueSize = 10;

        private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

         

        public static void main(String[] args)  {

            Test test = new Test();

            Producer producer = test.new Producer();

            Consumer consumer = test.new Consumer();

             

            producer.start();

            consumer.start();

        }

         

        class Consumer extends Thread{

             

            @Override

            public void run() {

                consume();

            }

             

            private void consume() {

                while(true){

                    try {

                        queue.take();

                        System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");

                    catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            }

        }

         

        class Producer extends Thread{

             

            @Override

            public void run() {

                produce();

            }

             

            private void produce() {

                while(true){

                    try {

                        queue.put(1);

                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));

                    catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            }

        }

    }

       有没有发现,使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。

      在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。

      阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

      参考资料:

      《Java编程实战》

      http://ifeve.com/java-blocking-queue/

      http://endual.iteye.com/blog/1412212

      http://blog.csdn.net/zzp_403184692/article/details/8021615

      http://www.cnblogs.com/juepei/p/3922401.html

    展开全文
  • 阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况: 1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。 2. 当队列中填满数据的...

    阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:

    1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

    一起聊聊JAVA 阻塞队列原理到底能给我们带来什么帮助?

    2. 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    一起聊聊JAVA 阻塞队列原理到底能给我们带来什么帮助?

    1. 阻塞队列的主要方法

    一起聊聊JAVA 阻塞队列原理到底能给我们带来什么帮助?

    • 抛出异常:抛出一个异常;
    • 特殊值:返回一个特殊值(null 或 false,视情况而定)
    • 则塞:在成功操作之前,一直阻塞线程
    • 超时:放弃前只在最大的时间内阻塞

    插入操作:

    public abstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是 NULL,则会抛出 NullPointerException 异常。

    public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。

    public abstract void put(E paramE) throws InterruptedException: 将指定元素插入此队列中,将等待可用的空间(如果有必要)

    public void put(E paramE) throws InterruptedException {

    checkNotNull(paramE);

    ReentrantLock localReentrantLock = this.lock;

    localReentrantLock.lockInterruptibly();

    try {

    while (this.count == this.items.length)

    this.notFull.await();//如果队列满了,则线程阻塞等待

    enqueue(paramE);13/04/2018 Page 81 of 283

    localReentrantLock.unlock();

    } finally {

    localReentrantLock.unlock();

    }

    }

    4:offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。

    获取数据操作:

    1:poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;

    2:poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。

    3:take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。

    4.drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    2. Java 中的阻塞队列

    1. ArrayBlockingQueue :由数组结构组成的有界阻塞队列。

    2. LinkedBlockingQueue :由链表结构组成的有界阻塞队列。

    3. PriorityBlockingQueue :支持优先级排序的无界阻塞队列。

    4. DelayQueue:使用优先级队列实现的无界阻塞队列。

    5. SynchronousQueue:不存储元素的阻塞队列。

    6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。

    7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

    一起聊聊JAVA 阻塞队列原理到底能给我们带来什么帮助?

    3. ArrayBlockingQueue(公平、非公平)

    用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

    4. LinkedBlockingQueue(两个独立锁提高并发)

    基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。

    5. PriorityBlockingQueue(compareTo 排序实现优先)

    是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

    6. DelayQueue(缓存失效、定时任务 )

    是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:

    1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。

    2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。

    7. SynchronousQueue(不存储数据、可用于传递数据)

    是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用, SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和ArrayBlockingQueue。

    8. LinkedTransferQueue

    是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

    1. transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。

    2. tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。

    9. LinkedBlockingDeque

    是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。

    展开全文
  • 主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 先看看 BlockingQueue 接口的文档说明:  1、add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出 IllegalStateException 异常;...  6、take:删除队列头部元素,如果队列为空,一直阻塞队列
  • 主要介绍了详解Java阻塞队列(BlockingQueue)的实现原理,阻塞队列是Java util.concurrent包下重要的数据结构,有兴趣的可以了解一下
  • Java中的阻塞队列接口BlockingQueue继承自Queue接口。 BlockingQueue接口提供了3个添加元素方法: add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常; ...
  • 本篇内容将对于阻塞队列原理、4中处理方式以及7中阻塞队列进行详细解析。 什么是阻塞队列 首先,再一次申明,队列必须是线程安全的,否则将毫无意义。阻塞队列最大的特征就是提供两种阻塞操作: 阻塞的插入元素:...
  • Java中的阻塞队列接口BlockingQueue继承自Queue接口。 BlockingQueue接口提供了3个添加元素方法: add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常; offer:添加...
  • Java阻塞队列

    2020-08-03 13:55:16
    本篇文章主要是介绍Java并发包中的几种阻塞队列和阻塞原理 背景 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法 1)支持阻塞的插入方法:意思是当队列满时...
  • Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
  • 什么是阻塞队列阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产...
  • 如果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一直阻塞;如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞。我们喜欢在单独的等待 set 中保存 put 线程和 ...
  • 阻塞队列原理: 其实阻塞队列实现阻塞同步的方式很简单,使用的就是: lock + 多个条件(condition)阻塞控制。 使用BlockingQueue封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。 ...
  • BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示: 在Java中,BlockingQueue是一个接口...
  • 阻塞队列的实现原理 1.Condition 2.LockSupport Java中的阻塞队列 什么是阻塞队列 阻塞队列(BlockingQueue)是一个附加支持阻塞的插入和移除的队列。 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素...
  • 阻塞队列是对普通队列的一种扩展,在普通队列功能上增加了一些额外功能。 普通队列的功能可以参照java的Queue接口 public interface Queue extends Collection { /** * Inserts the specified element ...
  • 主要介绍了剖析Java阻塞队列的实现原理及应用场景,这里也对阻塞和非阻塞队列的不同之处进行了对比,需要的朋友可以参考下
  • 阻塞队列的实现原理 核心: lock锁+多条件(condition)的阻塞控制 Java中的阻塞队列使用BlockingQueue封装了根据condition条件阻塞线程的过程,使得我们不需要求关系繁琐的await和signal操作。 以生产消费者模式举例...
  • 转自:Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析Java中的阻塞队列接口BlockingQueue继承自Queue接口。BlockingQueue接口提供了3个添加元素方法。add:添加元素到队列里,添加成功返回true,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 761
精华内容 304
关键字:

java阻塞队列原理

java 订阅