精华内容
下载资源
问答
  • 阻塞队列实现原理

    2020-03-05 21:35:26
    阻塞队列原理 java中ArrayBlockingQueue相信都用过,之前仅仅是使用没有了解其背后原理,知其然不知其所以然,这种学习方法是错误的,上学的时候老师总是告诉我们好记性不如烂笔头,相信如果自己手写一遍理解的更...

    阻塞队列原理

    • java中ArrayBlockingQueue相信都用过,之前仅仅是使用没有了解其背后原理,知其然不知其所以然,这种学习方法是错误的,上学的时候老师总是告诉我们好记性不如烂笔头,相信如果自己手写一遍理解的更深刻。
    • 实际上阻塞队列在异步编程经常使用,典型应用生产者消费者模型。
    • 代码如下:
    
    mport java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @Author : zzz
     * @Date : Created in 19:58 2020/3/5
     * @Description : 复习知识点,手写简易版阻塞队列
     */
    public class MyBlockingQueue<E> {
        final ReentrantLock lock ;
        final Condition notFull;
        final Condition notEmpty;
        private int takeIndex = 0;
        private int putIndex = 0;
        //计数器
        private int count = 0;
        //容量
        private final int capacity;
        private final Object[] items;
        //默认按照公平锁实现简易版
        public MyBlockingQueue(int capacity) {
            this.capacity = capacity;
            items = new Object[capacity];
            lock = new ReentrantLock();
            notFull = lock.newCondition();
            notEmpty = lock.newCondition();
        }
    
        public void put(E e) throws InterruptedException {
            //响应中断
            lock.lockInterruptibly();
            try {
                //元素个数等于数组长度
                while (items.length == count) {
                    notFull.await();
                }
                items[putIndex] = e;
                ++putIndex;
                System.out.println("生产者线程:" + count + ",putIndex=" + putIndex);
                if (putIndex == items.length) {
                    putIndex = 0;
                }
                count++;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                //元素个数为0
                while (count == 0) {
                    notEmpty.await();
                }
                E e = (E)items[takeIndex];
                items[takeIndex] = null;
                System.out.println("消费者线程count:" + count + ",takeIndex=" + takeIndex);
    			//刚开始写成了++takeIndex == count
                if (++takeIndex == items.length) {
                    System.out.println("居然为0");
                    takeIndex = 0;
                }
    
                count--;
                notFull.signal();
                return e;
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
    
            Thread producer1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    int i=0;
                    while (true) {
                        try {
                            queue.put((i++) + "");
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
    
           /* Thread producer2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            queue.put(Math.random() + "");
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });*/
    
            Thread consumer = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            String ele = queue.take();
                            System.out.println("消费者线程获取元素" + ele);
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
    
            producer1.start();
           // producer2.start();
            consumer.start();
    
            producer1.join();
            //producer2.join();
            consumer.join();
        }
    }
    
    
    

    控制台输出:代码中生产者1秒生产一个放入队列,消费者3秒消费一个,生产者生产快于消费者。虽然很简单,但自己手写一遍遇到很多问题,自己定位自己解决,这个过程对理解原理很有益处。

    生产者线程:0,putIndex=1
    消费者线程count:1,takeIndex=0
    消费者线程获取元素0
    生产者线程:0,putIndex=2
    生产者线程:1,putIndex=3
    生产者线程:2,putIndex=4
    消费者线程count:3,takeIndex=1
    消费者线程获取元素1
    生产者线程:2,putIndex=5
    生产者线程:3,putIndex=6
    消费者线程count:4,takeIndex=2
    消费者线程获取元素2
    生产者线程:3,putIndex=7
    生产者线程:4,putIndex=8
    生产者线程:5,putIndex=9
    消费者线程count:6,takeIndex=3
    消费者线程获取元素3
    生产者线程:5,putIndex=10
    生产者线程:6,putIndex=1
    生产者线程:7,putIndex=2
    消费者线程count:8,takeIndex=4
    消费者线程获取元素4
    生产者线程:7,putIndex=3
    生产者线程:8,putIndex=4
    生产者线程:9,putIndex=5
    消费者线程count:10,takeIndex=5
    消费者线程获取元素5
    生产者线程:9,putIndex=6
    消费者线程count:10,takeIndex=6
    消费者线程获取元素6
    生产者线程:9,putIndex=7
    消费者线程count:10,takeIndex=7
    生产者线程:9,putIndex=8
    消费者线程获取元素7
    消费者线程count:10,takeIndex=8
    消费者线程获取元素8
    生产者线程:9,putIndex=9
    
    Process finished with exit code -1
    
    
    
    展开全文
  • 主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • Java中的阻塞队列接口BlockingQueue继承自Queue接口。 BlockingQueue接口提供了3个添加元素方法: add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常; offer:添加...

    Java 阻塞队列实现原理分析


    Java中的阻塞队列接口BlockingQueue继承自Queue接口。

    BlockingQueue接口提供了3个添加元素方法:

    • add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常;
    • offer:添加元素到队列里,添加成功返回true,添加失败返回false;
    • put:添加元素到队列里,如果容量满了会阻塞直到容量不满。

    3个删除方法:

    • poll:删除队列头部元素,如果队列为空,返回null。否则返回元素;
    • remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false;
    • take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除。

    常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

    本文以ArrayBlockingQueue和LinkedBlockingQueue为例,分析它们的实现原理。

    ArrayBlockingQueue

    ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。

    ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。

    它带有的属性如下:

    
    
    1. // 存储队列元素的数组,是个循环数组 
    2.  
    3. final Object[] items; 
    4.  
    5.   
    6.  
    7. // 拿数据的索引,用于take,poll,peek,remove方法 
    8.  
    9. int takeIndex; 
    10.  
    11.   
    12.  
    13. // 放数据的索引,用于put,offer,add方法 
    14.  
    15. int putIndex; 
    16.  
    17.   
    18.  
    19. // 元素个数 
    20.  
    21. int count
    22.  
    23.   
    24.  
    25. // 可重入锁 
    26.  
    27. final ReentrantLock lock; 
    28.  
    29. // notEmpty条件对象,由lock创建 
    30.  
    31. private final Condition notEmpty; 
    32.  
    33. // notFull条件对象,由lock创建 
    34.  
    35. private final Condition notFull;  

    数据的添加

    ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

    add方法:

    
    
    1. public boolean add(E e) { 
    2.  
    3.     if (offer(e)) 
    4.  
    5.         return true
    6.  
    7.     else 
    8.  
    9.         throw new IllegalStateException("Queue full"); 
    10.  
    11. }  

    add方法内部调用offer方法如下:

    
    
    1. public boolean offer(E e) { 
    2.  
    3.     checkNotNull(e); // 不允许元素为空 
    4.  
    5.     final ReentrantLock lock = this.lock; 
    6.  
    7.     lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程 
    8.  
    9.     try { 
    10.  
    11.         if (count == items.length) // 如果队列已满 
    12.  
    13.             return false; // 直接返回false,添加失败 
    14.  
    15.         else { 
    16.  
    17.             insert(e); // 数组没满的话调用insert方法 
    18.  
    19.             return true; // 返回true,添加成功 
    20.  
    21.         } 
    22.  
    23.     } finally { 
    24.  
    25.         lock.unlock(); // 释放锁,让其他线程可以调用offer方法 
    26.  
    27.     } 
    28.  
    29. }  

    insert方法如下:

    
    
    1. private void insert(E x) { 
    2.  
    3.     items[putIndex] = x; // 元素添加到数组里 
    4.  
    5.     putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0 
    6.  
    7.     ++count; // 元素个数+1 
    8.  
    9.     notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知 
    10.  
    11. }  

    put方法:

    
    
    1. public void put(E e) throws InterruptedException { 
    2.  
    3.     checkNotNull(e); // 不允许元素为空 
    4.  
    5.     final ReentrantLock lock = this.lock; 
    6.  
    7.     lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程 
    8.  
    9.     try { 
    10.  
    11.         while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里 
    12.  
    13.             notFull.await(); // 线程阻塞并被挂起,同时释放锁 
    14.  
    15.         insert(e); // 调用insert方法 
    16.  
    17.     } finally { 
    18.  
    19.         lock.unlock(); // 释放锁,让其他线程可以调用put方法 
    20.  
    21.     } 
    22.  
    23. }  

    ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

    add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

    offer方法如果队列满了,返回false,否则返回true

    add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

    这3个方法内部都会使用可重入锁保证原子性。

    数据的删除

    ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

    poll方法:

    
    
    1. public E poll() { 
    2.  
    3.     final ReentrantLock lock = this.lock; 
    4.  
    5.     lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程 
    6.  
    7.     try { 
    8.  
    9.         return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法 
    10.  
    11.     } finally { 
    12.  
    13.         lock.unlock(); // 释放锁,让其他线程可以调用poll方法 
    14.  
    15.     } 
    16.  
    17. }  

    poll方法内部调用extract方法:

    
    
    1. private E extract() { 
    2.  
    3.     final Object[] items = this.items; 
    4.  
    5.     E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素 
    6.  
    7.     items[takeIndex] = null; // 对应取索引上的数据清空 
    8.  
    9.     takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0 
    10.  
    11.     --count; // 元素个数-1 
    12.  
    13.     notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知 
    14.  
    15.     return x; // 返回元素 
    16.  
    17. }  

    take方法:

    
    
    1. public E take() throws InterruptedException { 
    2.  
    3.     final ReentrantLock lock = this.lock; 
    4.  
    5.     lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程 
    6.  
    7.     try { 
    8.  
    9.         while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里 
    10.  
    11.             notEmpty.await(); // 线程阻塞并被挂起,同时释放锁 
    12.  
    13.         return extract(); // 调用extract方法 
    14.  
    15.     } finally { 
    16.  
    17.         lock.unlock(); // 释放锁,让其他线程可以调用take方法 
    18.  
    19.     } 
    20.  
    21. }  

    remove方法:

    
    
    1. public boolean remove(Object o) { 
    2.  
    3.     if (o == nullreturn false
    4.  
    5.     final Object[] items = this.items; 
    6.  
    7.     final ReentrantLock lock = this.lock; 
    8.  
    9.     lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程 
    10.  
    11.     try { 
    12.  
    13.         for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素 
    14.  
    15.             if (o.equals(items[i])) { // 两个对象相等的话 
    16.  
    17.                 removeAt(i); // 调用removeAt方法 
    18.  
    19.                 return true; // 删除成功,返回true 
    20.  
    21.             } 
    22.  
    23.         } 
    24.  
    25.         return false; // 删除成功,返回false 
    26.  
    27.     } finally { 
    28.  
    29.         lock.unlock(); // 释放锁,让其他线程可以调用remove方法 
    30.  
    31.     } 
    32.  
    33. }  

    removeAt方法:

    
    
    1. void removeAt(int i) { 
    2.  
    3.     final Object[] items = this.items; 
    4.  
    5.     if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可 
    6.  
    7.         items[takeIndex] = null
    8.  
    9.         takeIndex = inc(takeIndex); 
    10.  
    11.     } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值 
    12.  
    13.         for (;;) { 
    14.  
    15.             int nexti = inc(i); 
    16.  
    17.             if (nexti != putIndex) { 
    18.  
    19.                 items[i] = items[nexti]; 
    20.  
    21.                 i = nexti; 
    22.  
    23.             } else { 
    24.  
    25.                 items[i] = null
    26.  
    27.                 putIndex = i; 
    28.  
    29.                 break; 
    30.  
    31.             } 
    32.  
    33.         } 
    34.  
    35.     } 
    36.  
    37.     --count; // 元素个数-1 
    38.  
    39.     notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知  
    40.  
    41. }  

    ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:

    poll方法对于队列为空的情况,返回null,否则返回队列头部元素。

    remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。

    poll方法和remove方法不会阻塞线程。

    take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。

    这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

    LinkedBlockingQueue

    LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

    内部使用放锁和拿锁,这两个锁实现阻塞(“two lock queue” algorithm)。

    它带有的属性如下:

    
    
    1. // 容量大小 
    2.  
    3. private final int capacity; 
    4.  
    5.   
    6.  
    7. // 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger 
    8.  
    9. private final AtomicInteger count = new AtomicInteger(0); 
    10.  
    11.   
    12.  
    13. // 头结点 
    14.  
    15. private transient Node<E> head; 
    16.  
    17.   
    18.  
    19. // 尾节点 
    20.  
    21. private transient Node<E> last
    22.  
    23.   
    24.  
    25. // 拿锁 
    26.  
    27. private final ReentrantLock takeLock = new ReentrantLock(); 
    28.  
    29.   
    30.  
    31. // 拿锁的条件对象 
    32.  
    33. private final Condition notEmpty = takeLock.newCondition(); 
    34.  
    35.   
    36.  
    37. // 放锁 
    38.  
    39. private final ReentrantLock putLock = new ReentrantLock(); 
    40.  
    41.   
    42.  
    43. // 放锁的条件对象 
    44.  
    45. private final Condition notFull = putLock.newCondition();  

    ArrayBlockingQueue只有1个锁,添加数据和删除数据的时候只能有1个被执行,不允许并行执行。

    而LinkedBlockingQueue有2个锁,放锁和拿锁,添加数据和删除数据是可以并行进行的,当然添加数据和删除数据的时候只能有1个线程各自执行。

    数据的添加

    LinkedBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

    add方法内部调用offer方法:

    
    
    1. public boolean offer(E e) { 
    2.  
    3.     if (e == null) throw new NullPointerException(); // 不允许空元素 
    4.  
    5.     final AtomicInteger count = this.count
    6.  
    7.     if (count.get() == capacity) // 如果容量满了,返回false 
    8.  
    9.         return false
    10.  
    11.     int c = -1; 
    12.  
    13.     Node<E> node = new Node(e); // 容量没满,以新元素构造节点 
    14.  
    15.     final ReentrantLock putLock = this.putLock; 
    16.  
    17.     putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程 
    18.  
    19.     try { 
    20.  
    21.         if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行 
    22.  
    23.             enqueue(node); // 节点添加到链表尾部 
    24.  
    25.             c = count.getAndIncrement(); // 元素个数+1 
    26.  
    27.             if (c + 1 < capacity) // 如果容量还没满 
    28.  
    29.                 notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 
    30.  
    31.         } 
    32.  
    33.     } finally { 
    34.  
    35.         putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法 
    36.  
    37.     } 
    38.  
    39.     if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 
    40.  
    41.         signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 
    42.  
    43.     return c >= 0; // 添加成功返回true,否则返回false 
    44.  
    45. }  

    put方法:

    
    
    1. public void put(E e) throws InterruptedException { 
    2.  
    3.     if (e == null) throw new NullPointerException(); // 不允许空元素 
    4.  
    5.     int c = -1; 
    6.  
    7.     Node<E> node = new Node(e); // 以新元素构造节点 
    8.  
    9.     final ReentrantLock putLock = this.putLock; 
    10.  
    11.     final AtomicInteger count = this.count
    12.  
    13.     putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程 
    14.  
    15.     try { 
    16.  
    17.         while (count.get() == capacity) { // 如果容量满了 
    18.  
    19.             notFull.await(); // 阻塞并挂起当前线程 
    20.  
    21.         } 
    22.  
    23.         enqueue(node); // 节点添加到链表尾部 
    24.  
    25.         c = count.getAndIncrement(); // 元素个数+1 
    26.  
    27.         if (c + 1 < capacity) // 如果容量还没满 
    28.  
    29.             notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 
    30.  
    31.     } finally { 
    32.  
    33.         putLock.unlock(); // 释放放锁,让其他线程可以调用put方法 
    34.  
    35.     } 
    36.  
    37.     if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 
    38.  
    39.         signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 
    40.  
    41. }  

    LinkedBlockingQueue的添加数据方法add,put,offer跟ArrayBlockingQueue一样,不同的是它们的底层实现不一样。

    ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒。

    而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程。

    数据的删除

    LinkedBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

    poll方法:

    
    
    1. public E poll() { 
    2.  
    3.     final AtomicInteger count = this.count
    4.  
    5.     if (count.get() == 0) // 如果元素个数为0 
    6.  
    7.         return null; // 返回null 
    8.  
    9.     E x = null
    10.  
    11.     int c = -1; 
    12.  
    13.     final ReentrantLock takeLock = this.takeLock; 
    14.  
    15.     takeLock.lock(); // 拿锁加锁,保证调用poll方法的时候只有1个线程 
    16.  
    17.     try { 
    18.  
    19.         if (count.get() > 0) { // 判断队列里是否还有数据 
    20.  
    21.             x = dequeue(); // 删除头结点 
    22.  
    23.             c = count.getAndDecrement(); // 元素个数-1 
    24.  
    25.             if (c > 1) // 如果队列里还有元素 
    26.  
    27.                 notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费 
    28.  
    29.         } 
    30.  
    31.     } finally { 
    32.  
    33.         takeLock.unlock(); // 释放拿锁,让其他线程可以调用poll方法 
    34.  
    35.     } 
    36.  
    37.     if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据 
    38.  
    39.         signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据 
    40.  
    41.                 return x; 
    42.  

    take方法:

    
    
    1. public E take() throws InterruptedException { 
    2.  
    3.     E x; 
    4.  
    5.     int c = -1; 
    6.  
    7.     final AtomicInteger count = this.count
    8.  
    9.     final ReentrantLock takeLock = this.takeLock; 
    10.  
    11.     takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程 
    12.  
    13.     try { 
    14.  
    15.         while (count.get() == 0) { // 如果队列里已经没有元素了 
    16.  
    17.             notEmpty.await(); // 阻塞并挂起当前线程 
    18.  
    19.         } 
    20.  
    21.         x = dequeue(); // 删除头结点 
    22.  
    23.         c = count.getAndDecrement(); // 元素个数-1 
    24.  
    25.         if (c > 1) // 如果队列里还有元素 
    26.  
    27.             notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费 
    28.  
    29.     } finally { 
    30.  
    31.         takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法 
    32.  
    33.     } 
    34.  
    35.     if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据 
    36.  
    37.         signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据 
    38.  
    39.     return x; 
    40.  

    remove方法:

    
    
    1. public boolean remove(Object o) { 
    2.  
    3.     if (o == nullreturn false
    4.  
    5.     fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁 
    6.  
    7.     try { 
    8.  
    9.         for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历 
    10.  
    11.              p != null
    12.  
    13.              trail = p, p = p.next) { 
    14.  
    15.             if (o.equals(p.item)) { // 判断是否找到对象 
    16.  
    17.                 unlink(p, trail); // 修改节点的链接信息,同时调用notFull的signal方法 
    18.  
    19.                 return true
    20.  
    21.             } 
    22.  
    23.         } 
    24.  
    25.         return false
    26.  
    27.     } finally { 
    28.  
    29.         fullyUnlock(); // 2个锁解锁 
    30.  
    31.     } 
    32.  

    LinkedBlockingQueue的take方法对于没数据的情况下会阻塞,poll方法删除链表头结点,remove方法删除指定的对象。

    需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。


    作者:佚名

    来源:51CTO

    展开全文
  • Java中的阻塞队列接口BlockingQueue继承自Queue接口。 BlockingQueue接口提供了3个添加元素方法: add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常; ...

    Java中的阻塞队列接口BlockingQueue继承自Queue接口。

    BlockingQueue接口提供了3个添加元素方法:

    1. add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常;

    2. put:添加元素到队列里,如果容量满了会阻塞直到容量不满。

    3个删除方法:

    1. poll:删除队列头部元素,如果队列为空,返回null。否则返回元素;

    2. remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false;

    3. take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除。

    常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

    本文以ArrayBlockingQueue和LinkedBlockingQueue为例,分析它们的实现原理。

    ArrayBlockingQueue

    ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。

    ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。

    它带有的属性如下:

    // 存储队列元素的数组,是个循环数组

    final Object[] items;

    // 拿数据的索引,用于take,poll,peek,remove方法

    int takeIndex;

    // 放数据的索引,用于put,offer,add方法

    int putIndex;

    // 元素个数

    int count;

    // 可重入锁

    final ReentrantLock lock;

    // notEmpty条件对象,由lock创建

    private final Condition notEmpty;

    // notFull条件对象,由lock创建

    private final Condition notFull;

    数据的添加

    ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

    add方法:

    public boolean add(E e) {

    if (offer(e))

    return true;

    else

    throw new IllegalStateException("Queue full");

    }

    add方法内部调用offer方法如下:

    public boolean offer(E e) {

    checkNotNull(e); // 不允许元素为空

    final ReentrantLock lock = this.lock;

    lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程

    try {

    if (count == items.length) // 如果队列已满

    return false; // 直接返回false,添加失败

    else {

    insert(e); // 数组没满的话调用insert方法

    return true; // 返回true,添加成功

    }

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用offer方法

    }

    }

    insert方法如下:

    private void insert(E x) {

    items[putIndex] = x; // 元素添加到数组里

    putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0

    ++count; // 元素个数+1

    notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知

    }

    put方法:

    public void put(E e) throws InterruptedException {

    checkNotNull(e); // 不允许元素为空

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程

    try {

    while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里

    notFull.await(); // 线程阻塞并被挂起,同时释放锁

    insert(e); // 调用insert方法

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用put方法

    }

    }

    ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

    add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

    offer方法如果队列满了,返回false,否则返回true

    add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

    这3个方法内部都会使用可重入锁保证原子性。

    数据的删除

    ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

    poll方法:

    public E poll() {

    final ReentrantLock lock = this.lock;

    lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程

    try {

    return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用poll方法

    }

    }

    poll方法内部调用extract方法:

    private E extract() {

    final Object[] items = this.items;

    E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素

    items[takeIndex] = null; // 对应取索引上的数据清空

    takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0

    --count; // 元素个数-1

    notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知

    return x; // 返回元素

    }

    take方法:

    public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程

    try {

    while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里

    notEmpty.await(); // 线程阻塞并被挂起,同时释放锁

    return extract(); // 调用extract方法

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用take方法

    }

    }

    remove方法:

    public boolean remove(Object o) {

    if (o == null) return false;

    final Object[] items = this.items;

    final ReentrantLock lock = this.lock;

    lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程

    try {

    for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素

    if (o.equals(items[i])) { // 两个对象相等的话

    removeAt(i); // 调用removeAt方法

    return true; // 删除成功,返回true

    }

    }

    return false; // 删除成功,返回false

    } finally {

    lock.unlock(); // 释放锁,让其他线程可以调用remove方法

    }

    }

    removeAt方法:

    void removeAt(int i) {

    final Object[] items = this.items;

    if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可

    items[takeIndex] = null;

    takeIndex = inc(takeIndex);

    } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值

    for (;;) {

    int nexti = inc(i);

    if (nexti != putIndex) {

    items[i] = items[nexti];

    i = nexti;

    } else {

    items[i] = null;

    putIndex = i;

    break;

    }

    }

    }

    --count; // 元素个数-1

    notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知

    }

    ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:

    poll方法对于队列为空的情况,返回null,否则返回队列头部元素。

    remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。

    poll方法和remove方法不会阻塞线程。

    take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。

    这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

    LinkedBlockingQueue

    LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

    内部使用放锁和拿锁,这两个锁实现阻塞(“two lock queue” algorithm)。

    它带有的属性如下:

    // 容量大小

    private final int capacity;

    // 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger

    private final AtomicInteger count = new AtomicInteger(0);

    // 头结点

    private transient Node<E> head;

    // 尾节点

    private transient Node<E> last;

    // 拿锁

    private final ReentrantLock takeLock = new ReentrantLock();

    // 拿锁的条件对象

    private final Condition notEmpty = takeLock.newCondition();

    // 放锁

    private final ReentrantLock putLock = new ReentrantLock();

    // 放锁的条件对象

    private final Condition notFull = putLock.newCondition();

    ArrayBlockingQueue只有1个锁,添加数据和删除数据的时候只能有1个被执行,不允许并行执行。

    而LinkedBlockingQueue有2个锁,放锁和拿锁,添加数据和删除数据是可以并行进行的,当然添加数据和删除数据的时候只能有1个线程各自执行。

    数据的添加

    LinkedBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

    add方法内部调用offer方法:

    public boolean offer(E e) {

    if (e == null) throw new NullPointerException(); // 不允许空元素

    final AtomicInteger count = this.count;

    if (count.get() == capacity) // 如果容量满了,返回false

    return false;

    int c = -1;

    Node<E> node = new Node(e); // 容量没满,以新元素构造节点

    final ReentrantLock putLock = this.putLock;

    putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程

    try {

    if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行

    enqueue(node); // 节点添加到链表尾部

    c = count.getAndIncrement(); // 元素个数+1

    if (c + 1 < capacity) // 如果容量还没满

    notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满

    }

    } finally {

    putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法

    }

    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据

    signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费

    return c >= 0; // 添加成功返回true,否则返回false

    }

    put方法:

    public void put(E e) throws InterruptedException {

    if (e == null) throw new NullPointerException(); // 不允许空元素

    int c = -1;

    Node<E> node = new Node(e); // 以新元素构造节点

    final ReentrantLock putLock = this.putLock;

    final AtomicInteger count = this.count;

    putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程

    try {

    while (count.get() == capacity) { // 如果容量满了

    notFull.await(); // 阻塞并挂起当前线程

    }

    enqueue(node); // 节点添加到链表尾部

    c = count.getAndIncrement(); // 元素个数+1

    if (c + 1 < capacity) // 如果容量还没满

    notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满

    } finally {

    putLock.unlock(); // 释放放锁,让其他线程可以调用put方法

    }

    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据

    signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费

    }

    LinkedBlockingQueue的添加数据方法add,put,offer跟ArrayBlockingQueue一样,不同的是它们的底层实现不一样。

    ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒。

    而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程。

    数据的删除

    LinkedBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

    poll方法:

    public E poll() {

    final AtomicInteger count = this.count;

    if (count.get() == 0) // 如果元素个数为0

    return null; // 返回null

    E x = null;

    int c = -1;

    final ReentrantLock takeLock = this.takeLock;

    takeLock.lock(); // 拿锁加锁,保证调用poll方法的时候只有1个线程

    try {

    if (count.get() > 0) { // 判断队列里是否还有数据

    x = dequeue(); // 删除头结点

    c = count.getAndDecrement(); // 元素个数-1

    if (c > 1) // 如果队列里还有元素

    notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费

    }

    } finally {

    takeLock.unlock(); // 释放拿锁,让其他线程可以调用poll方法

    }

    if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据

    signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据

    return x;

    }

    take方法:

    public E take() throws InterruptedException {

    E x;

    int c = -1;

    final AtomicInteger count = this.count;

    final ReentrantLock takeLock = this.takeLock;

    takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程

    try {

    while (count.get() == 0) { // 如果队列里已经没有元素了

    notEmpty.await(); // 阻塞并挂起当前线程

    }

    x = dequeue(); // 删除头结点

    c = count.getAndDecrement(); // 元素个数-1

    if (c > 1) // 如果队列里还有元素

    notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费

    } finally {

    takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法

    }

    if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据

    signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据

    return x;

    }

    remove方法:

    public boolean remove(Object o) {

    if (o == null) return false;

    fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁

    try {

    for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历

    p != null;

    trail = p, p = p.next) {

    if (o.equals(p.item)) { // 判断是否找到对象

    unlink(p, trail); // 修改节点的链接信息,同时调用notFull的signal方法

    return true;

    }

    }

    return false;

    } finally {

    fullyUnlock(); // 2个锁解锁

    }

    }

    LinkedBlockingQueue的take方法对于没数据的情况下会阻塞,poll方法删除链表头结点,remove方法删除指定的对象。

    需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。

    展开全文
  • 而非阻塞实现方式则可以使用循环CAS的方式来实现,本文让我们一起来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。 ...

    引言

    在并发编程中我们有时候需要使用线程安全的队列。如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现,本文让我们一起来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。

    ConcurrentLinkedQueue的介绍

    ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。

    ConcurrentLinkedQueue的结构

    我们通过ConcurrentLinkedQueue的类图来分析一下它的结构。 在此输入图片描述ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tair节点等于head节点。

    private transient volatile Node<e> tail = head;
    

    入队列

    入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及head节点和tair节点的变化,每添加一个节点我就做了一个队列的快照图。 在此输入图片描述

    第一步添加元素1:队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。 第二步添加元素2:队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。 第三步添加元素3:设置tail节点的next节点为元素3节点。 第四步添加元素4:设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

    通过debug入队过程并观察head节点和tail节点的变化,发现入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助。

    上面的分析让我们从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用CAS算法来入队的。

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
    
        //入队前,创建一个入队节点
        Node<E> n = new Node<E>(e);
    
        retry:
            //死循环,入队不成功反复入队。
            for (;;) {
                //创建一个指向tail节点的引用
                Node<E> t = tail;
                //p用来表示队列的尾节点,默认情况下等于tail节点。
                Node<E> p = t;
    
                for (int hops = 0; ; hops++) {
                    //获得p节点的下一个节点。
                    Node<E> next = succ(p);
                    //next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
                    if (next != null) {
                        //循环了两次及其以上,并且当前节点还是不等于尾节点
                        if (hops > HOPS && t != tail)
                            continue retry;
                        p = next;
                    }
    
                    //如果p是尾节点,则设置p节点的next节点为入队节点。
                    else if (p.casNext(null, n)) {
                        //如果tail节点有大于等于1个next节点,则将入队节点设置成tair节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tair节点
                        if (hops >= HOPS)
                            casTail(t, n); // 更新tail节点,允许失败
                        return true;
                    }
    
                    // p有next节点,表示p的next节点是尾节点,则重新设置p节点
                    else {
                        p = succ(p);
                    }
                }
            }
    }
    

    从源代码角度来看整个入队过程主要做二件事情。第一是定位出尾节点,第二是使用CAS算法能将入队节点设置成尾节点的next节点,如不成功则重试。

    第一步定位尾节点。tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点,尾节点可能就是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回head节点。获取p节点的next节点代码如下:

    final Node<E> succ(Node<E> p) {
        Node<E> next = p.getNext();
        return (p == next) ? head : next;
    }
    

    第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

    hops的设计意图。上面分析过对于先进先出的队列入队所要做的事情就是将入队节点设置成尾节点,doug lea写的代码和逻辑还是稍微有点复杂。那么我用以下方式来实现行不行?

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        Node</e><e> n = new Node</e><e>(e);
        for (;;) {
            Node</e><e> t = tail;
            if (t.casNext(null, n) && casTail(t, n)) {
                return true;
            }
        }
    }
    

    让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。但是这么做有个缺点就是每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率,所以doug lea使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将 tail节点更新成尾节点,而是当 tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

    private static final int HOPS = 1;
    

    还有一点需要注意的是入队方法永远返回true,所以不要通过返回值判断入队是否成功。

    出队列

    出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化。 在此输入图片描述

    从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。让我们再通过源码来深入分析下出队过程。

    public E poll() {
        Node<E> h = head;
    
        // p表示头节点,需要出队的节点
        Node<E> p = h;
        for (int hops = 0;; hops++) {
            // 获取p节点的元素
            E item = p.getItem();
    
            // 如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,如果成功则返回p节点的元素。
            if (item != null && p.casItem(item, null)) {
                if (hops >= HOPS) {
                    // 将p节点下一个节点设置成head节点
                    Node<E> q = p.getNext();
    
                    updateHead(h, (q != null) ? q : p);
                }
                return item;
            }
    
            // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。那么获取p节点的下一个节点
            Node<E> next = succ(p);
    
            // 如果p的下一个节点也为空,说明这个队列已经空了
            if (next == null) {
                // 更新头节点。
                updateHead(h, p);
                break;
            }
    
            // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
            p = next;
        }
        return null;
    }
    

    首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

     

    文章来源:https://my.oschina.net/xianggao/blog/389332

    转载于:https://my.oschina.net/u/2988360/blog/956533

    展开全文
  • 如果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一直阻塞;如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞。我们喜欢在单独的等待 set 中保存 put 线程和 ...
  • ​ 其实阻塞队列实现阻塞同步的方式很简单,使用的就是:lock锁+多个条件(condition)的阻塞控制。 ​ 使用BlockingQueue封装了根据condition条件阻塞线程的过程,使得我们不用去关心繁琐的await/signal操作了...
  • 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列 这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。 阻塞队列常用于生产者和消费者的...
  • 阻塞队列时如何实现阻塞的?为什么它是线程安全的?
  • 阻塞队列实现原理 核心: lock锁+多条件(condition)的阻塞控制 Java中的阻塞队列使用BlockingQueue封装了根据condition条件阻塞线程的过程,使得我们不需要求关系繁琐的await和signal操作。 以生产消费者模式举例...
  • 1. 阻塞队列概述 ① 什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,...
  • 主要介绍了剖析Java中阻塞队列实现原理及应用场景,这里也对阻塞和非阻塞队列的不同之处进行了对比,需要的朋友可以参考下
  • 阻塞队列原理

    2020-03-15 19:23:15
    一、什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。 这两个附加的操作是:在队列为空时,...(1)ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形...
  • 并发队列 – 无界阻塞队列 LinkedBlockingQueue 原理探究http://www.importnew.com/25583.html一、前言前面介绍了使用CAS实现的非阻塞队列ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现阻塞队列...
  • 转载自 并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究一、 前言常用的并发队列有阻塞队列和非阻塞队列,前者使用锁实现,后者则使用CAS非阻塞算法实现,使用非阻塞队列一般性能比较好,下面就看看常用的...
  • 阻塞队列实现原理 使用通知模式实现。 所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。通过查看JDK源码发现ArrayBlockingQueue...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,061
精华内容 424
关键字:

阻塞队列实现原理