精华内容
下载资源
问答
  • 【JAVA多线程19】JAVA 阻塞队列原理

    千次阅读 2019-06-13 00:39:08
    今天我们来讨论另外一类容器:阻塞队列。  非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。  使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,...

    今天我们来讨论另外一类容器:阻塞队列。

      非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。

      使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

      本文先讲述一下java.util.concurrent包下提供主要的几种阻塞队列,然后分析了阻塞队列和非阻塞队列的中的各个方法,接着分析了阻塞队列的实现原理,最后给出了一个实际例子和几个使用场景。

      一.几种主要的阻塞队列

      二.阻塞队列中的方法 VS 非阻塞队列中的方法

      三.阻塞队列的实现原理

      四.示例和使用场景

      若有不正之处请多多谅解,并欢迎批评指正。

      请尊重作者劳动成果,转载请标明原文链接:

       http://www.cnblogs.com/dolphin0520/p/3932906.html

    一.几种主要的阻塞队列

      自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

      ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

      LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

      PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

      DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    二.阻塞队列中的方法 VS 非阻塞队列中的方法

    1.非阻塞队列中的几个主要方法:

      add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;

      remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;

      offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;

      poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;

      peek():获取队首元素,若成功,则返回队首元素;否则返回null

     

      对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

    2.阻塞队列中的几个主要方法:

      阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法:

      put(E e)

      take()

      offer(E e,long timeout, TimeUnit unit)

      poll(long timeout, TimeUnit unit)

      

      put方法用来向队尾存入元素,如果队列满,则等待;

      take方法用来从队首取元素,如果队列为空,则等待;

      offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

      poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

    三.阻塞队列的实现原理

      前面谈到了非阻塞队列和阻塞队列中常用的方法,下面来探讨阻塞队列的实现原理,本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

      首先看一下ArrayBlockingQueue类中的几个成员变量:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>

    implements BlockingQueue<E>, java.io.Serializable {

     

    private static final long serialVersionUID = -817911632652898426L;

     

    /** The queued items  */

    private final E[] items;

    /** items index for next take, poll or remove */

    private int takeIndex;

    /** items index for next put, offer, or add. */

    private int putIndex;

    /** Number of items in the queue */

    private int count;

     

    /*

    * Concurrency control uses the classic two-condition algorithm

    * found in any textbook.

    */

     

    /** Main lock guarding all access */

    private final ReentrantLock lock;

    /** Condition for waiting takes */

    private final Condition notEmpty;

    /** Condition for waiting puts */

    private final Condition notFull;

    }

       可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。

      lock是一个可重入锁,notEmpty和notFull是等待条件。

      下面看一下ArrayBlockingQueue的构造器,构造器有三个重载版本:

    1

    2

    3

    4

    5

    6

    7

    8

    public ArrayBlockingQueue(int capacity) {

    }

    public ArrayBlockingQueue(int capacity, boolean fair) {

     

    }

    public ArrayBlockingQueue(int capacity, boolean fair,

                              Collection<? extends E> c) {

    }

       第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。

      然后看它的两个关键方法的实现:put()和take():

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    public void put(E e) throws InterruptedException {

        if (e == nullthrow new NullPointerException();

        final E[] items = this.items;

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            try {

                while (count == items.length)

                    notFull.await();

            catch (InterruptedException ie) {

                notFull.signal(); // propagate to non-interrupted thread

                throw ie;

            }

            insert(e);

        finally {

            lock.unlock();

        }

    }

       从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。

      当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。

      我们看一下insert方法的实现:

    1

    2

    3

    4

    5

    6

    private void insert(E x) {

        items[putIndex] = x;

        putIndex = inc(putIndex);

        ++count;

        notEmpty.signal();

    }

       它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。

      下面是take()方法的实现:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    public E take() throws InterruptedException {

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        try {

            try {

                while (count == 0)

                    notEmpty.await();

            catch (InterruptedException ie) {

                notEmpty.signal(); // propagate to non-interrupted thread

                throw ie;

            }

            E x = extract();

            return x;

        finally {

            lock.unlock();

        }

    }

       跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    private E extract() {

        final E[] items = this.items;

        E x = items[takeIndex];

        items[takeIndex] = null;

        takeIndex = inc(takeIndex);

        --count;

        notFull.signal();

        return x;

    }

       跟insert方法也很类似。

      其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。

    四.示例和使用场景

      下面先使用Object.wait()和Object.notify()、非阻塞队列实现生产者-消费者模式:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    public class Test {

        private int queueSize = 10;

        private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);

         

        public static void main(String[] args)  {

            Test test = new Test();

            Producer producer = test.new Producer();

            Consumer consumer = test.new Consumer();

             

            producer.start();

            consumer.start();

        }

         

        class Consumer extends Thread{

             

            @Override

            public void run() {

                consume();

            }

             

            private void consume() {

                while(true){

                    synchronized (queue) {

                        while(queue.size() == 0){

                            try {

                                System.out.println("队列空,等待数据");

                                queue.wait();

                            catch (InterruptedException e) {

                                e.printStackTrace();

                                queue.notify();

                            }

                        }

                        queue.poll();          //每次移走队首元素

                        queue.notify();

                        System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");

                    }

                }

            }

        }

         

        class Producer extends Thread{

             

            @Override

            public void run() {

                produce();

            }

             

            private void produce() {

                while(true){

                    synchronized (queue) {

                        while(queue.size() == queueSize){

                            try {

                                System.out.println("队列满,等待有空余空间");

                                queue.wait();

                            catch (InterruptedException e) {

                                e.printStackTrace();

                                queue.notify();

                            }

                        }

                        queue.offer(1);        //每次插入一个元素

                        queue.notify();

                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));

                    }

                }

            }

        }

    }

       这个是经典的生产者-消费者模式,通过阻塞队列和Object.wait()和Object.notify()实现,wait()和notify()主要用来实现线程间通信。

      具体的线程间通信方式(wait和notify的使用)在后续问章中会讲述到。

      下面是使用阻塞队列实现的生产者-消费者模式:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    public class Test {

        private int queueSize = 10;

        private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

         

        public static void main(String[] args)  {

            Test test = new Test();

            Producer producer = test.new Producer();

            Consumer consumer = test.new Consumer();

             

            producer.start();

            consumer.start();

        }

         

        class Consumer extends Thread{

             

            @Override

            public void run() {

                consume();

            }

             

            private void consume() {

                while(true){

                    try {

                        queue.take();

                        System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");

                    catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            }

        }

         

        class Producer extends Thread{

             

            @Override

            public void run() {

                produce();

            }

             

            private void produce() {

                while(true){

                    try {

                        queue.put(1);

                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));

                    catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            }

        }

    }

       有没有发现,使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。

      在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。

      阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

      参考资料:

      《Java编程实战》

      http://ifeve.com/java-blocking-queue/

      http://endual.iteye.com/blog/1412212

      http://blog.csdn.net/zzp_403184692/article/details/8021615

      http://www.cnblogs.com/juepei/p/3922401.html

    展开全文
  • 主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要介绍了剖析Java中阻塞队列的实现原理及应用场景,这里也对阻塞和非阻塞队列的不同之处进行了对比,需要的朋友可以参考下
  • 阻塞队列原理

    2020-03-15 19:23:15
    一、什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。 这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。 阻塞队列不保证...

    一、什么是阻塞队列

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
    这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。

    常见的有以下几种
    (1)ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
    (2)LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,而在未指明容量时,容量默认为Integer.MAX_VALUE。
    (3)LinkedBlockingDeque: 使用双向队列实现的双端阻塞队列,双端意味着可以像普通队列一样FIFO(先进先出),可以以像栈一样FILO(先进后出)
    (4)PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较,跟时间没有任何关系,仅仅是按照优先级取任务。
    (5)DelayQueue:同PriorityBlockingQueue,也是二叉堆实现的优先级阻塞队列。要求元素都实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
    (6)SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

    二、Condition的原理

    阻塞队列实际上是使用了Condition来模拟线程间协作。
    Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition。
    调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用

    注意事项

    Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

    三、put和take的原理

    这里用ArrayBlockingQueue来举例
    它带有的属性如下

    
    // 存储队列元素的数组,是个循环数组
    final Object[] items;
    
    // 拿数据的索引,用于take,poll,peek,remove方法
    int takeIndex;
    
    // 放数据的索引,用于put,offer,add方法
    int putIndex;
    
    // 元素个数
    int count;
    
    // 可重入锁
    final ReentrantLock lock;
    
    // notEmpty条件对象,由lock创建
    private final Condition notEmpty;
    
    // notFull条件对象,由lock创建
    private final Condition notFull;
    

    1.数据的添加

    put:阻塞添加

    public void put(E e) throws InterruptedException {
        checkNotNull(e); // 不允许元素为空
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程
        try {
            while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里
                notFull.await(); // 线程阻塞并被挂起,同时释放锁
            insert(e); // 调用insert方法
        } finally {
            lock.unlock(); // 释放锁,让其他线程可以调用put方法
        }
    }
    

    insert

    private void insert(E x) {
        items[putIndex] = x; // 元素添加到数组里
        putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
        ++count; // 元素个数+1
        notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
    }
    

    2.数据的获取

    take:通知模式
    Condition notEmpty:队列为空,notEmpty会await
    插入元素会signal

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
        try {
            while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
                notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
            return extract(); // 调用extract方法
        } finally {
            lock.unlock(); // 释放锁,让其他线程可以调用take方法
        }
    }
    

    extract

    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
        items[takeIndex] = null; // 对应取索引上的数据清空
        takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0
        --count; // 元素个数-1
        notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
        return x; // 返回元素
    }
    
    展开全文
  • Java中的阻塞队列接口BlockingQueue继承自Queue接口。BlockingQueue接口提供了3个添加元素方法:add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常;offer:添加元素到...

    Java中的阻塞队列接口BlockingQueue继承自Queue接口。

    BlockingQueue接口提供了3个添加元素方法:

    add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常;

    offer:添加元素到队列里,添加成功返回true,添加失败返回false;

    put:添加元素到队列里,如果容量满了会阻塞直到容量不满。

    3个删除方法:

    poll:删除队列头部元素,如果队列为空,返回null。否则返回元素;

    remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false;

    take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除。

    常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

    本文以ArrayBlockingQueue和LinkedBlockingQueue为例,分析它们的实现原理。

    ArrayBlockingQueue

    ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。

    ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。

    它带有的属性如下:

    // 存储队列元素的数组,是个循环数组

    final Object[] items;

    // **的索引,用于take,poll,peek,remove方法

    int takeIndex;

    // 放数据的索引,用于put,offer,add方法

    int putIndex;

    // 元素个数

    int count;

    // 可重入锁

    final ReentrantLock lock;

    // notEmpty条件对象,由lock创建

    private final Condition notEmpty;

    // notFull条件对象,由lock创建

    private final Condition notFull;

    数据的添加

    ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

    add方法:

    public boolean add(E e) {

    if (offer(e))

    return true;

    else

    throw new IllegalStateException("Queue full");

    }

    add方法内部调用offer方法如下:

    public boolean offer(E e) {

    checkNotNull(e); // 不允许元素为空

    final ReentrantLock lock = this.lock;

    lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程

    try {

    if (count == items.length) // 如果队列已满

    return false; // 直接返回false,添加失败

    else {

    insert(e); // 数组没满的话调用insert方法

    return true; // 返回true,添加成功

    }

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用offer方法

    }

    }

    insert方法如下:

    private void insert(E x) {

    items[putIndex] = x; // 元素添加到数组里

    putIndex = inc(putIndex); // 放数据索引 1,当索引满了变成0

    count; // 元素个数 1

    notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知

    }

    put方法:

    public void put(E e) throws InterruptedException {

    checkNotNull(e); // 不允许元素为空

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程

    try {

    while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里

    notFull.await(); // 线程阻塞并被挂起,同时释放锁

    insert(e); // 调用insert方法

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用put方法

    }

    }

    ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

    add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

    offer方法如果队列满了,返回false,否则返回true

    add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

    这3个方法内部都会使用可重入锁保证原子性。

    数据的删除

    ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

    poll方法:

    public E poll() {

    final ReentrantLock lock = this.lock;

    lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程

    try {

    return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用poll方法

    }

    }

    poll方法内部调用extract方法:

    private E extract() {

    final Object[] items = this.items;

    E x = this.

    Tag标签:

    展开全文
  • JAVA 阻塞队列原理 阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况: 1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。 ...

    JAVA 阻塞队列原理

    阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:
    1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
    2. 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    阻塞队列的主要方法
     
    抛出异常:抛出一个异常;
    特殊值:返回一个特殊值(null 或 false,视情况而定)
    阻塞:在成功操作之前,一直阻塞线程
    超时:放弃前只在最大的时间内阻塞
     
    1:public abstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是 NULL,则会抛出 NullPointerException 异常。
    2:public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
    3:public abstract void put(E paramE) throws InterruptedException: 将指定元素插入此队列中,将等待可用的空间(如果有必要)
    public void put(E paramE) throws InterruptedException {
        checkNotNull(paramE);
        ReentrantLock localReentrantLock = this.lock;
        localReentrantLock.lockInterruptibly();
        try {
            while (this.count == this.items.length)
            this.notFull.await();//如果队列满了,则线程阻塞等待
            enqueue(paramE);
            localReentrantLock.unlock();
            } finally {
                localReentrantLock.unlock();
        }
    }
    4:offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。
     
    获取数据操作:
    1:poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;
    2:poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
    3:take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。
    4.drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
     
    Java 中的阻塞队列
    1. ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
    2. LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
    3. PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
    4. DelayQueue:使用优先级队列实现的无界阻塞队列。
    5. SynchronousQueue:不存储元素的阻塞队列。
    6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。
    7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列
    ArrayBlockingQueue(公平、非公平)
    用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。 默认情况下不保证访问者公平的访问队列 ,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐
    量。我们可以使用以下代码 创建一个公平的阻塞队列 :ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
     
    LinkedBlockingQueue(两个独立锁提高并发)
    基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的 处理并发数据 ,还因为其 对于生产者端和消费者端分别采用了独立的锁来控制数据同步 ,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
     
    PriorityBlockingQueue( compareTo 排序实现优先
    是一个 支持优先级的无界队列 。默认情况下元素采取自然顺序升序排列。 可以自定义实现compareTo() 方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
     
    DelayQueue(缓存失效、定时任务 )
    是一个支 持延时获取元素的无界阻 塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:
    1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
    2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。
     
    SynchronousQueue(不存储数据、可用于传递数据)
    是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素 。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和
    ArrayBlockingQueue。
     
    LinkedTransferQueue
    是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 ,LinkedTransferQueue 多了 tryTransfer transfer 方法。
    1. transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的poll()方法时),transfer 方法可以 把生产者传入的元素立刻 transfer(传输)给消费者 。如果没有消费者在等待接收元素, transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回
    2. tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。
     
    LinkedBlockingDeque
    是一个由链表结构组成的 双向阻塞队列 。所谓双向队列指的 你可以从队列的两端插入和移出元素 。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队
    列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。
    展开全文
  • 1)阻塞队列原理: ​ 其实阻塞队列实现阻塞同步的方式很简单,使用的就是:lock锁+多个条件(condition)的阻塞控制。 ​ 使用BlockingQueue封装了根据condition条件阻塞线程的过程,使得我们不用去关心繁琐的...
  • 阻塞队列 实现了BlockingQueue接口 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。 ...
  • 本篇内容将对于阻塞队列原理、4中处理方式以及7中阻塞队列进行详细解析。 什么是阻塞队列 首先,再一次申明,队列必须是线程安全的,否则将毫无意义。阻塞队列最大的特征就是提供两种阻塞操作: 阻塞的插入元素:...
  • 1. 阻塞队列概述 ① 什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,...
  • 阻塞队列的实现原理 核心: lock锁+多条件(condition)的阻塞控制 Java中的阻塞队列使用BlockingQueue封装了根据condition条件阻塞线程的过程,使得我们不需要求关系繁琐的await和signal操作。 以生产消费者模式举例...
  • 1.并发队列ConcuttrentLinkedQueue(非阻塞)、BlockingQueue(阻塞式) 阻塞式与非阻塞队列的区别 入列、出列 阻塞式: 入列(存队列阻塞队列,如果超出队列总数,这时候会进行等待。 出列(获取队列),...
  • Java并发编程-阻塞队列(BlockingQueue)的实现原理

    万次阅读 多人点赞 2016-06-13 19:37:30
    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,...
  • 阻塞队列常用于生产者消费者模式的场景,为了解决生产者和消费者处理效率不平衡的问题,通过阻塞队列来为生产者和消费者解耦,两者不直接通信,而是通过阻塞队列通信,生产者是向阻塞队列添加元素的线程,消费者是从...
  • ! 一....  在新增的Concurrent包中,BlockingQueue...通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及...
  • Redis阻塞/非阻塞队列

    2021-04-28 00:42:38
    阻塞队列RPUSH key value [value ...]RPOP keyLPUSH key value [value ...]LPOP keyR/LPUSH都是后进先出操作,组合起来则是先进先出,属于非阻塞操作,即:不管有无找到指定键值都立即返回。阻塞队列RPUSH key ...
  • 阻塞队列原理详解

    千次阅读 2017-03-03 16:48:46
    前一篇关于阻塞队列原理和使用介绍的很生硬且笼统,因为自己是菜鸟经验少,完全是自己一个人的使用和总结。故而再此附上一篇介绍的很详细精彩的文章,希望能对大家有所帮助吧!!!
  • if(nexti!=putIndex){items[i]=items[nexti];i=nexti;}else{items[i]=null;putIndex=i;break;}}}--count;//元素个数-1notFull.signal();//应用前提对象notFull通知,比如...这个时刻花费了一条数据,队列没满了,...
  • 讲述线程池原理,线程池使用场景和注意事项,手动创建线程池方法,注意事项,阻塞队列的相关知识
  • 接下来分别从服务端和客户端来阐述这一逻辑的实现原理。Redis Server:redis实现了一套事件触发模型,主要处理两种事件:I/O事件(文件事件)和定时事件。而处理它们的就靠一个EventLoop线程。同时redis还提供了丰富的...
  • 阻塞队列的底层原理

    2019-12-10 10:53:52
      ArrayBlockingQueue是基于数组实现的有界的先进先出的阻塞队列,所以我们可以说队列的头部是队列中呆的时间最长的或者叫最早的,而队列的尾部则是刚刚呆的时间最短的,因为尾部的元素都是最新插入的。...
  • 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时, 获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于 生产者和消费者的...
  • 阻塞队列时如何实现阻塞的?为什么它是线程安全的?
  • 主要介绍了详解Java阻塞队列(BlockingQueue)的实现原理阻塞队列是Java util.concurrent包下重要的数据结构,有兴趣的可以了解一下
  • 先看看 BlockingQueue 接口的文档说明:  1、add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出 IllegalStateException 异常;...  6、take:删除队列头部元素,如果队列为空,一直阻塞队列
  • 实现思路:利用单向链表来保存队列的数据,在往队列中添加元素的时候,新建一个节点,加入到队尾,加入到队尾的操作,利用CAS原理加乐观锁,另外,还需要将新加的节点设置为新的队尾,此步操作也需要利用CAS,head与...
  • 从redis的API可以了解到lpop,rpop可以实现一个阻塞队列。那疑问就来了,redis不是单线程的吗,如果阻塞了,那其他操作就执行不了呀。事实不是这样的 redis的线程模型,是接收客户端命令的线程时 I/O 多路复用的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 129,458
精华内容 51,783
关键字:

阻塞队列原理

友情链接: Bitmap.rar