阻塞队列_阻塞队列实现 - CSDN
精华内容
参与话题
  • 阻塞队列和非阻塞队列

    千次阅读 2018-06-21 14:27:56
    1.阻塞队列和非阻塞队列的区别:阻塞队列可以阻塞,非阻塞队列不能阻塞,只能使用队列wait(),notify()进行队列消息传送。而阻塞队列当队列里面没有值时,会阻塞直到有值输入。输入也一样,当队列满的时候,会阻塞,...

    1.阻塞队列和非阻塞队列的区别:阻塞队列可以阻塞,非阻塞队列不能阻塞,只能使用队列wait(),notify()进行队列消息传送。而阻塞队列当队列里面没有值时,会阻塞直到有值输入。输入也一样,当队列满的时候,会阻塞,直到队列不为空。

    2.阻塞队列

    • ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

    • LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

    • PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

    • DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    3.阻塞队列和非阻塞队列的方法

    1.非阻塞队列中的几个主要方法:

    • add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;

    • remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;

    • offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;

    • poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;

    • peek():获取队首元素,若成功,则返回队首元素;否则返回null

    对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

    2.阻塞队列中的几个主要方法:

    阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。

    除此之外,阻塞队列提供了另外4个非常有用的方法:

      put(E e)

      take()

      offer(E e,long timeout, TimeUnit unit)

      poll(long timeout, TimeUnit unit)

    这四个方法的理解:

      put方法用来向队尾存入元素,如果队列满,则等待;

      take方法用来从队首取元素,如果队列为空,则等待;

      offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

      poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

    4.demo

    阻塞队列

    package com.example.demo;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    /**
     * @program: test
     * @description
     * @author: dajitui
     * @create: 2018-06-21 11:26
     **/
    public class zusheQueue {
        private int queueSize = 10;
        private ArrayBlockingQueue<Integer> queue =
                new ArrayBlockingQueue<Integer>(queueSize);
    
        public static void main(String[] args)  {
            zusheQueue test = new zusheQueue();
            Producer producer = test.new Producer();
            Consumer consumer = test.new Consumer();
    
            producer.start();
            consumer.start();
        }
    
        class Consumer extends Thread{
    
            @Override
            public void run() {
                consume();
            }
    
            private void consume() {
                while(true){
                    try {
                        queue.take();
                        System.out.println("从队列取走一个元素,队列剩余"+
                                queue.size()+"个元素");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        class Producer extends Thread{
    
            @Override
            public void run() {
                produce();
            }
    
            private void produce() {
                while(true){
                    try {
                        queue.put(1);
                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+
                                (queueSize-queue.size()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    非阻塞队列

    package com.example.demo;
    
    import java.util.PriorityQueue;
    
    import static com.example.demo.RateLimiterDemo.A;
    
    /**
     * @program: test
     * @description
     * @author: dajitui
     * @create: 2018-06-21 10:59
     **/
    public class feizusheQueue
    {
        private int queueSize = 10;
        private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    
        public static void main(String[] args)  {
            feizusheQueue test = new feizusheQueue  ();
            Producer producer = test.new Producer();
            Consumer consumer = test.new Consumer();
    
            producer.start();
            consumer.start();
        }
    
        class Consumer extends Thread{
    
            @Override
            public void run() {
                consume();
            }
    
            private void consume() {
                while(true){
                    synchronized (queue) {
                        while(queue.size() == 0){
                            try {
                                System.out.println("队列空,等待数据");
                                queue.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                                queue.notify();
                            }
                        }
                        queue.poll();          //每次移走队首元素
                        queue.notify();
                        System.out.println("从队列取走一个元素,队列剩余"+
                                queue.size()+"个元素");
                    }
                }
            }
        }
    
        class Producer extends Thread{
    
            @Override
            public void run() {
                produce();
            }
    
            private void produce() {
                while(true){
                    synchronized (queue) {
                        while(queue.size() == queueSize){
                            try {
                                System.out.println("队列满,等待有空余空间");
                                queue.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                                queue.notify();
                            }
                        }
                        queue.offer(1);        //每次插入一个元素
                        queue.notify();
                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+
                                (queueSize-queue.size()));
                    }
                }
            }
        }
    }
    

    可以看到阻塞队列不需要synchronized,或者调用wait(),notify()来进行队列交互。


    转发链接https://mp.weixin.qq.com/s?__biz=MzAxNjM2MTk0Ng==&mid=2247484440&idx=1&sn=9725e182205cf7629b163ed664c74e7a&chksm=9bf4b4adac833dbbfe1a44b7397ea284fc34c3f38e3a6a344ee5beb5e71de155c484f4c1d0a8&mpshare=1&scene=2&srcid=0614mUIZXCRzz8REvuO0K4OE&from=timeline#rd




    展开全文
  • 阻塞队列详细介绍

    2020-07-28 08:33:57
    1. 什么是阻塞队列阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于...

    1. 什么是阻塞队列?

    阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

    2. Java里的阻塞队列

    JDK7提供了7个阻塞队列。分别是

    • ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。
    • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue: 一个不存储元素的阻塞队列。
    • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。

    BlockingQueue接口 与 BlockingDeque 接口

      JDK提供的阻塞队列中,LinkedBlockingDeque 是一个 Deque(双向的队列),其实现的接口是 BlockingDeque;其余6个阻塞队列则是 Queue(单向队列),实现的接口是 BlockingQueue。

    对于 BlockingQueue 的阻塞队列提供了四种处理方法:

    方法描述 抛出异常 返回特殊的值 一直阻塞 超时退出
    插入数据 add(e) offer(e) put(e) offer(e,time,unit)
    获取并移除队列的头 remove() poll() take() poll(time,unit)
    获取但不移除队列的头 element() peek() 不可用 不可用
    • 抛出异常: 是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementEx·ception异常 。
    • 返回特殊值: 插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
    • 一直阻塞: 当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
    • 超时退出: 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

       抛出异常 与 返回特殊值 方法的实现是一样的,只不过对失败的操作的处理不一样!通过 AbstractQueue 的源码可以发现,add(e),remove(),element() 都是分别基于 offer(),poll(),peek() 实现的

    public boolean add(E arg0) {
    		if (this.offer(arg0)) {
    			return true;
    		} else {
    			throw new IllegalStateException("Queue full");
    		}
    	}
    
    	public E remove() {
    		Object arg0 = this.poll();
    		if (arg0 != null) {
    			return arg0;
    		} else {
    			throw new NoSuchElementException();
    		}
    	}
    
    	public E element() {
    		Object arg0 = this.peek();
    		if (arg0 != null) {
    			return arg0;
    		} else {
    			throw new NoSuchElementException();
    		}
    	}
    

    JDK 文档提到的几点:

    • BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。
    • BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。
    • BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。
    • BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll,这些方法尽可能地少使用)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

    对于 BlockingDeque 的双向队列也提供了四种形式的方法

    第一个元素(头部)

    方法描述 抛出异常 返回特殊的值 一直阻塞 超时退出
    插入数据 addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
    获取并移除队列的头 removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
    获取但不移除队列的头 getFirst() peekFirst() 不适用 不适用

    最后一个元素(尾部)

    方法描述 抛出异常 返回特殊的值 一直阻塞 超时退出
    插入数据 addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
    获取并移除队列的头 removeLast() pollLast() takeLast() pollLast(time, unit)
    获取但不移除队列的头 getLast() peekLast() 不适用 不适用

    像所有 BlockingQueue 一样,BlockingDeque 是线程安全的,但不允许 null 元素,并且可能有(也可能没有)容量限制。

    BlockingDeque 接口继承扩展了 BlockingQueue 接口,对于 继承自 BlockingQueue 的方法,除了插入方法(add、poll、offer方法,是插入的队列的尾部),其他方法,操作的都是队列的头部(第一个元素)。

    七个阻塞队列的详细介绍

    1. ArrayBlockingQueue

      ArrayBlockingQueue是一个用数组实现的 有界阻塞队列。 此队列按照先进先出(FIFO)的原则对元素进行排序。
      默认情况下不保证访问者公平地访问队列 ,所谓公平访问队列是指阻塞的线程,可按照阻塞的先后顺序访问队列。非公平性是对先等待的线程是不公平的,当队列可用时,阻塞的线程都可以竞争访问队列的资格。
      为了保证公平性,通常会降低吞吐量。

    ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);
    

    访问者的公平性是使用可重入锁实现的 ,代码如下:

    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
    }
    

    2. LinkedBlockingQueue

      LinkedBlockingQueue是一个用链表实现的 有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。 此队列按照先进先出的原则对元素进行排序。

    3. PriorityBlockingQueue

      PriorityBlockingQueue是一个支持优先级的无界队列(虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败,导致 OutOfMemoryError)。默认情况下元素采取自然顺序排列(每个元素都必须实现 Comparable 接口),也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列.
      其iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。如果需要 有序地进行遍历, 则应考虑使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。
       在此类上进行的操作不保证具有同等优先级的元素的顺序。 如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。

       class FIFOEntry<E extends Comparable<? super E>>
         implements Comparable<FIFOEntry<E>> {
       final static AtomicLong seq = new AtomicLong();
       final long seqNum;
       final E entry;
       public FIFOEntry(E entry) {
         seqNum = seq.getAndIncrement();
         this.entry = entry;
       }
       public E getEntry() { return entry; }
       public int compareTo(FIFOEntry<E> other) {
         int res = entry.compareTo(other.entry);
         if (res == 0 && other.entry != this.entry)
           res = (seqNum < other.seqNum ? -1 : 1);
         return res;
       }
     }
    

    4. DelayQueue

      Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。注意 DelayQueue 的所有方法只能操作“到期的元素“,例如,poll()、remove()、size()等方法,都会忽略掉未到期的元素。
    我们可以将DelayQueue运用在以下应用场景:

    • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
    • 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

    DelayQueue 的实现是基于 PriorityQueue,是一个优先级队列,是以延时时间的长短进行排序的。所以,DelayQueue 需要知道每个元素的延时时间,而这个延时时间是由 Delayed 接口的 getDelay()方法获取的。所以, DelayQueue 的元素必须实现 Delay 接口;

    //计算并返回延时时间
    public long getDelay(TimeUnit unit) {
                return unit.convert(time - now(), TimeUnit.NANOSECONDS);
            }
    

    延时队列的原理

    延时队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。

    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)
                            return q.poll();
                        else if (leader != null)
                            available.await();
    

    5. SynchronousQueue

      一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。

    SynchronousQueue 的几个特点

    • 同步队列没有任何内部容量,甚至连一个队列的容量都没有。 所以很多继承的方法就没有用了,(如 isEmpty()始终返回true,size()为0,包含contain、移除remove 都始终为false 等等)。或者说,真正有意义的只有以下几个方法:获取并移除(poll()、poll(timeout,timeunit)、take())、插入(offer()、offer(timeout,timeunit)、put());
    • 适合于传递性设计,在这种设计中, 每一个put操作必须等待一个take操作,反之亦然 。(当然,如果用的是offer、poll的话,那么就不会阻塞等待)。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。
    • 支持可选的公平排序策略。 默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。
    //设置公平性的构造方法
    public SynchronousQueue(boolean fair) 
              创建一个具有指定公平策略的 SynchronousQueue。
    

    6. LinkedTransferQueue

      LinkedTransferQueue是一个由链表结构组成的 无界阻塞TransferQueue队列 。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。
    transfer方法: 如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer方法的关键代码如下:

    Node pred = tryAppend(s, haveData);
    return awaitMatch(s, pred, e, (how == TIMED), nanos);
    

    tryTransfer方法: 则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。

    对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法 ,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

    原理:

    参考自http://blog.csdn.net/xiaoxufox/article/details/52241317:

    LinkedTransferQueue采用的一种预占模式。意思就是消费者线程取元素时,如果队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程park住,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,唤醒该节点上park住线程,被唤醒的消费者线程拿货走人。

    7. LinkedBlockingDeque

      LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。 双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。 相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是Jdk的bug,使用时还是用带有First和Last后缀的方法更清楚。
       和 LinkedBlockingQueue 一样,是有界的阻塞队列,默认长度以及最大长度是 Integer.MAX_VALUE。可在创建时,指定容量。

    阻塞队列的实现原理

      如果队列是空的,消费者会一直等待,当生产者添加元素时候,消费者是如何知道当前队列有元素的呢?如果让你来设计阻塞队列你会如何设计,让生产者和消费者能够高效率的进行通讯呢?让我们先来看看JDK是如何实现的。

      使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现,代码如下

    private final Condition notFull;
    private final Condition notEmpty;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
            //省略其他代码
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                insert(e);
            } finally {
                lock.unlock();
            }
    }
    
    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return extract();
      } finally {
                lock.unlock();
            }
    }
    
    private void insert(E x) {
            items[putIndex] = x;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        }
    

    当我们往队列里插入一个元素时,如果队列不可用,阻塞生产者主要通过LockSupport.park(this);来实现

    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
    
    reportInterruptAfterWait(interruptMode);
            }
    

    继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。

    public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            unsafe.park(false, 0L);
            setBlocker(t, null);
        }
    

    unsafe.park是个native方法,代码如下:

    public native void park(boolean isAbsolute, long time);
    

    park这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。

    • 与park对应的unpark执行或已经执行时。注意:已经执行是指unpark先执行,然后再执行的park。
    • 线程被中断时。
    • 如果参数中的time不是零,等待了指定的毫秒数时。
    • 发生异常现象时。这些异常事先无法确定。

    我们继续看一下JVM是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代码如下:

    void os::PlatformEvent::park() {
         	     int v ;
    	     for (;;) {
    		v = _Event ;
    	     if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
    	     }
    	     guarantee (v >= 0, "invariant") ;
    	     if (v == 0) {
    	     // Do this the hard way by blocking ...
    	     int status = pthread_mutex_lock(_mutex);
    	     assert_status(status == 0, status, "mutex_lock");
    	     guarantee (_nParked == 0, "invariant") ;
    	     ++ _nParked ;
    	     while (_Event < 0) {
    	     status = pthread_cond_wait(_cond, _mutex);
    	     // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
    	     // Treat this the same as if the wait was interrupted
    	     if (status == ETIME) { status = EINTR; }
    	     assert_status(status == 0 || status == EINTR, status, "cond_wait");
    	     }
    	     -- _nParked ;
    
    	     // In theory we could move the ST of 0 into _Event past the unlock(),
    	     // but then we'd need a MEMBAR after the ST.
    	     _Event = 0 ;
    	     status = pthread_mutex_unlock(_mutex);
    	     assert_status(status == 0, status, "mutex_unlock");
    	     }
    	     guarantee (_Event >= 0, "invariant") ;
    	     }
    
         }
    

    pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。

    当队列满时,生产者往阻塞队列里插入一个元素,生产者线程会进入WAITING (parking)状态。我们可以使用jstack dump阻塞的生产者线程看到这点:

    "main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
            at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
            at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)
    展开全文
  • 阻塞队列(一):ArrayBlockingQueue 1. 简介 ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的。 ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来...

    阻塞队列(一):ArrayBlockingQueue

    1. 简介

    ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的。

    ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。

    ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。

    2. 构造方法

    先看看 java.util.concurrent.ArrayBlockingQueue 的构造方法,代码如下:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
    
        private static final long serialVersionUID = -817911632652898426L;
        
        final Object[] items;
        int takeIndex;
        int putIndex;
        int count;
        // 重入锁
        final ReentrantLock lock;
        // notEmpty condition
        private final Condition notEmpty;
        // notFull condition
        private final Condition notFull;
        transient ArrayBlockingQueue.Itrs itrs;
        
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
        
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
        
    }
    
    • 可以清楚地看到 ArrayBlockingQueue 继承 java.util.AbstractQueue ,实现 java.util.concurrent.BlockingQueue 接口。看过 java.util 包源码的同学应该都认识AbstractQueue,该类在 java.util.Queue 接口中扮演着非常重要的作用,该类提供了对queue 操作的骨干实现(具体内容移驾其源码)。
    • java.util.concurrent.BlockingQueue 继承 java.util.Queue 接口,为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作。作为使用者,则不需要关心队列在什么时候阻塞线程,什么时候唤醒线程,所有一切均由 BlockingQueue 来完成。
    • ArrayBlockingQueue 内部使用可重入锁 ReentrantLock + Condition 来完成多线程环境的并发操作。
      • items 变量,一个定长数组,维护 ArrayBlockingQueue 的元素。takeIndex 变量,int ,为 ArrayBlockingQueue 队首位置。putIndex 变量,int ,ArrayBlockingQueue 队尾位置。count 变量,元素个数。
      • lock 变量,ReentrantLock ,ArrayBlockingQueue 出列入列都必须获取该锁,两个步骤共用一个锁。notEmpty 变量,非空,即出列条件。notFull 变量,未满,即入列条件。

    3. 入队

    ArrayBlockingQueue 提供了诸多方法,可以将元素加入队列尾部。

    • #add(E e) 方法:将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true ,如果此队列已满,则抛出 IllegalStateException 异常。
    • #offer(E e) 方法:将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true ,如果此队列已满,则返回 false 。
    • #offer(E e, long timeout, TimeUnit unit) 方法:将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
    • #put(E e) 方法:将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。

    3.1 add

    // ArrayBlockingQueue.java
    @Override
    public boolean add(E e) {
        return super.add(e);
    }
    
    // AbstractQueue.java
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    
    • #add(E e)方法,调用 #offer(E e) 方法,如果返回false,则直接抛出 IllegalStateException 异常。

    3.2 offer

    @Override
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    • 首先,检查是否为 null
    • 然后,获取 Lock 锁。获取锁成功后,如果队列已满则,直接返回 false 。
    • 最后,调用 #enqueue(E e) 方法,它为入列的核心方法,所有入列的方法最终都将调用该方法,在队列尾部插入元素。

    3.2.1 enqueue

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        // 添加元素
        final Object[] items = this.items;
        items[putIndex] = x;
        // 到达队尾,回归队头
        if (++putIndex == items.length)
            putIndex = 0;
        // 总数+1
        count++;
        // 通知阻塞在出列的线程
        notEmpty.signal();
    }
    
    • 该方法就是在 putIndex(对尾)位置处,添加元素,最后调用 notEmpty#signal() 方法,通知阻塞在出列的线程(如果队列为空,则进行出列操作是会阻塞)。

    3.3 可超时的 offer

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lockInterruptibly();
        try {
            // <1> 若队列已满,循环等待被通知,再次检查队列是否非空
            while (count == items.length) {
                // 可等待的时间小于等于零,直接返回失败
                if (nanos <= 0)
                    return false;
                // 等待,直到超时
                nanos = notFull.awaitNanos(nanos); // 返回的为剩余可等待时间,相当于每次等待,都会扣除相应已经等待的时间。
            }
            // 入队
            enqueue(e);
            return true;
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    • 相比 #offer(E e) 方法,增加了 <1> 处:若队列已满,调用 notFull#awaitNanos(long nanos) 方法,等待被通知(元素出列时,会调用 notFull#signal() 方法,进行通知阻塞等待的入列线程)或者超时。被通知后,再次检查队列是否非空。若非空,继续向下执行,否则继续等待被通知。

    3.4 put

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lockInterruptibly();
        try {
            // <1> 若队列已满,循环等待被通知,再次检查队列是否非空
            while (count == items.length)
                notFull.await();
            // 入队
            enqueue(e);
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    • 相比 #offer(E e) 方法,增加了 <1> 处:若队列已满,调用 notFull#await() 方法,等待被通知(元素出列时,会调用 notFull#await() 方法,进行通知阻塞等待的入列线程)。被通知后,再次检查队列是否非空。若非空,继续向下执行,否则继续等待被通知。

    4. 出队

    ArrayBlockingQueue 提供的出队方法如下:

    • #poll() 方法:获取并移除此队列的头,如果此队列为空,则返回 null
    • #poll(long timeout, TimeUnit unit) 方法:获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
    • #take() 方法:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
    • #remove(Object o) 方法:从此队列中移除指定元素的单个实例(如果存在)。

    4.1 poll

    public E poll() {
        // 获得锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 获得头元素
            return (count == 0) ? null : dequeue();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    
    • 如果队列为空,则返回 null,否则,调用 #dequeue() 方法,获取列头元素。

    3.1.1 dequeue

     private E dequeue() {
        final Object[] items = this.items;
        // 去除队首元素
        E x = (E) items[takeIndex];
        items[takeIndex] = null; // 置空
        // 到达队尾,回归队头
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 总数 - 1
        count--;
        // 维护下迭代器
        if (itrs != null)
            itrs.elementDequeued();
        // 通知阻塞在入列的线程
        notFull.signal();
        return x;
    }
    
    • 该方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器 itrs 不为 null ,则需要维护下该迭代器。最后,调用 notFull#signal() 方法,唤醒阻塞在入列线程。

    4.2 可超时的 poll

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lockInterruptibly();
        try {
            // <1> 若队列已空,循环等待被通知,再次检查队列是否非空
            while (count == 0) {
                // 可等待的时间小于等于零,直接返回 null
                if (nanos <= 0)
                    return null;
                // 等待,直到超时
                nanos = notEmpty.awaitNanos(nanos); // 返回的为剩余可等待时间,相当于每次等待,都会扣除相应已经等待的时间。
            }
            // 出队
            return dequeue();
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    • 相比 #poll() 方法,增加了 <1> 处:若队列已空,调用 notEmpty#awaitNanos(long nanos) 方法,等待被通知(元素入列时,会调用 notEmpty#signal() 方法,进行通知阻塞等待的出列线程)或者超时返回 null。被通知后,再次检查队列是否为空。若非空,继续向下执行,否则继续等待被通知。

    4.3 take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lockInterruptibly();
        try {
            // <1> 若队列已空,循环等待被通知,再次检查队列是否非空
            while (count == 0)
                notEmpty.await();
            // 出列
            return dequeue();
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    • 相比 #poll() 方法,增加了 <1> 处:若队列已空,调用 notEmpty#await() 方法,等待被通知(元素入列时,会调用 notEmpty#signal() 方法,进行通知阻塞等待的出列线程)。被通知后,再次检查队列是否为空。若非空,继续向下执行,否则继续等待被通知。

    4.4 remove

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        // 获得锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 循环向下查找,若匹配,则进行移除。
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    
    • 详细解析,见代码注释。

    • #removeAt(int removeIndex) 方法,移除指定位置的元素。代码如下:

      void removeAt(final int removeIndex) {
          // assert lock.getHoldCount() == 1;
          // assert items[removeIndex] != null;
          // assert removeIndex >= 0 && removeIndex < items.length;
          final Object[] items = this.items;
          // 移除的为队头,直接移除即可
          if (removeIndex == takeIndex) {
              // removing front item; just advance
              items[takeIndex] = null;
              if (++takeIndex == items.length)
                  takeIndex = 0;
              count--;
              if (itrs != null)
                  itrs.elementDequeued();
          // 移除非队头,移除的同时,需要向前复制,填补这个空缺。
          } else {
              // an "interior" remove
      
              // slide over all others up through putIndex.
              final int putIndex = this.putIndex;
              for (int i = removeIndex;;) {
                  int next = i + 1;
                  if (next == items.length)
                      next = 0;
                  if (next != putIndex) {
                      items[i] = items[next];
                      i = next;
                  } else {
                      items[i] = null;
                      this.putIndex = i;
                      break;
                  }
              }
              count--;
              if (itrs != null)
                  itrs.removedAt(removeIndex);
          }
          // 通知
          notFull.signal();
      }
      

    5. 补充说明

    老艿艿:因为本文的重心在 ArrayBlockingQueue 的入队和出队,所以其他方法,例如迭代器等等,并未解析。所以,胖友,你懂的,自己研究哈。

    666. 彩蛋

    如果你对 Java 并发感兴趣,欢迎加入我的知识星球一起交流。

    展开全文
  • ! 一....  在新增的Concurrent包中,BlockingQueue...通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及...

    本文转载自  http://wsmajunfeng.iteye.com/blog/1629354,!!

    一. 前言

      在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

    二. 认识BlockingQueue

      阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

      从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;

      常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)

        先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。

        后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。  


          多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒),下面两幅图演示了BlockingQueue的两个常见阻塞场景:
           如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。


       如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

      这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。既然BlockingQueue如此神通广大,让我们一起来见识下它的常用方法:

    三. BlockingQueue的核心方法

      1.放入数据

        (1)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法

     的线程);       
           (2)offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

        (3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

      2. 获取数据

        (1)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

        (2)poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间

    超时还没有数据可取,返回失败。

        (3)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 

        (4)drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    四. 常见BlockingQueue

      在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue家庭大致有哪些成员?

      1. ArrayBlockingQueue

      基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

      ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

      2.LinkedBlockingQueue

      基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

      作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

      ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

      下面的代码演示了如何使用BlockingQueue:

      (1) 测试类

    复制代码

     1 import java.util.concurrent.BlockingQueue;
     2 import java.util.concurrent.ExecutorService;
     3 import java.util.concurrent.Executors;
     4 import java.util.concurrent.LinkedBlockingQueue; 
     5 
     6 public class BlockingQueueTest {
     7  
     8     public static void main(String[] args) throws InterruptedException {
     9         // 声明一个容量为10的缓存队列
    10         BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
    11  
    12         //new了三个生产者和一个消费者
    13         Producer producer1 = new Producer(queue);
    14         Producer producer2 = new Producer(queue);
    15         Producer producer3 = new Producer(queue);
    16         Consumer consumer = new Consumer(queue);
    17  
    18         // 借助Executors
    19         ExecutorService service = Executors.newCachedThreadPool();
    20         // 启动线程
    21         service.execute(producer1);
    22         service.execute(producer2);
    23         service.execute(producer3);
    24         service.execute(consumer);
    25  
    26         // 执行10s
    27         Thread.sleep(10 * 1000);
    28         producer1.stop();
    29         producer2.stop();
    30         producer3.stop();
    31  
    32         Thread.sleep(2000);
    33         // 退出Executor
    34         service.shutdown();
    35     }
    36 }

    复制代码

      (2)生产者类

    复制代码

     1 import java.util.Random;
     2 import java.util.concurrent.BlockingQueue;
     3 import java.util.concurrent.TimeUnit;
     4 import java.util.concurrent.atomic.AtomicInteger;
     5  
     6 /**
     7  * 生产者线程
     8  * 
     9  * @author jackyuj
    10  */
    11 public class Producer implements Runnable {
    12     
    13     private volatile boolean  isRunning = true;//是否在运行标志
    14     private BlockingQueue queue;//阻塞队列
    15     private static AtomicInteger count = new AtomicInteger();//自动更新的值
    16     private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
    17  
    18     //构造函数
    19     public Producer(BlockingQueue queue) {
    20         this.queue = queue;
    21     }
    22  
    23     public void run() {
    24         String data = null;
    25         Random r = new Random();
    26  
    27         System.out.println("启动生产者线程!");
    28         try {
    29             while (isRunning) {
    30                 System.out.println("正在生产数据...");
    31                 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数
    32  
    33                 data = "data:" + count.incrementAndGet();//以原子方式将count当前值加1
    34                 System.out.println("将数据:" + data + "放入队列...");
    35                 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true
    36                     System.out.println("放入数据失败:" + data);
    37                 }
    38             }
    39         } catch (InterruptedException e) {
    40             e.printStackTrace();
    41             Thread.currentThread().interrupt();
    42         } finally {
    43             System.out.println("退出生产者线程!");
    44         }
    45     }
    46  
    47     public void stop() {
    48         isRunning = false;
    49     }
    50 }

    复制代码

      (3)消费者类

    复制代码

     1 import java.util.Random;
     2 import java.util.concurrent.BlockingQueue;
     3 import java.util.concurrent.TimeUnit;
     4  
     5 /**
     6  * 消费者线程
     7  * 
     8  * @author jackyuj
     9  */
    10 public class Consumer implements Runnable {
    11     
    12     private BlockingQueue<String> queue;
    13     private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
    14  
    15     //构造函数
    16     public Consumer(BlockingQueue<String> queue) {
    17         this.queue = queue;
    18     }
    19  
    20     public void run() {
    21         System.out.println("启动消费者线程!");
    22         Random r = new Random();
    23         boolean isRunning = true;
    24         try {
    25             while (isRunning) {
    26                 System.out.println("正从队列获取数据...");
    27                 String data = queue.poll(2, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
    28                 if (null != data) {
    29                     System.out.println("拿到数据:" + data);
    30                     System.out.println("正在消费数据:" + data);
    31                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    32                 } else {
    33                     // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
    34                     isRunning = false;
    35                 }
    36             }
    37         } catch (InterruptedException e) {
    38             e.printStackTrace();
    39             Thread.currentThread().interrupt();
    40         } finally {
    41             System.out.println("退出消费者线程!");
    42         }
    43     }
    44  
    45     
    46 }

    复制代码

      3. DelayQueue

      DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

      使用场景:

      DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

      4. PriorityBlockingQueue

       基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

      5. SynchronousQueue

       一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。

      声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:

      如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;

      但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

    五. 小结

      BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

    本文转载自  http://wsmajunfeng.iteye.com/blog/1629354,!!

     

    展开全文
  • 什么是阻塞队列阻塞队列是一个在队列基础上又支持了两个附加操作的队列。 2个附加操作: 支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。 支持阻塞的移除方法:队列空时,获取元素...
  • 阻塞队列

    千次阅读 2017-12-08 09:42:22
    Java中的阻塞队列 1.1 什么是阻塞队列(BlockingQueue) 支持阻塞操作的队列。具体来讲,支持阻塞添加和阻塞移除。 阻塞添加:当队列满的时候,队列会阻塞插入插入的元素的线程,直到队列不满; 阻塞移除:...
  • 实现阻塞队列

    2020-06-21 21:04:42
    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只...
  • 实现一个简单的阻塞队列

    千次阅读 2018-05-17 11:37:18
    import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by Lenovo on 2018/5/1...
  • Java并发编程-阻塞队列(BlockingQueue)的实现原理

    万次阅读 多人点赞 2016-08-10 19:58:48
    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,...
  • 阻塞队列原理详解

    千次阅读 2017-03-03 16:51:32
    前一篇关于阻塞队列的原理和使用介绍的很生硬且笼统,因为自己是菜鸟经验少,完全是自己一个人的使用和总结。故而再此附上一篇介绍的很详细精彩的文章,希望能对大家有所帮助吧!!!
  • Java中的阻塞队列学习与总结体会

    万次阅读 2020-05-12 17:10:39
    备注:后续增加,只是列了自己的学习目录
  • ArrayBlockingQueueArrayBlockingQueue是一个有界阻塞队列,基于数组、ReentrantLock、Condition实现。 所谓阻塞队列是指,队列满了,则会对生产线程产生阻塞直到有空位可插入; 当队列空了,则会对消费队列产生...
  • java 中 阻塞队列阻塞队列 和普通队列的区别

    千次阅读 热门讨论 2018-08-22 23:14:26
    阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列...
  • 阻塞队列与非阻塞队列区别

    千次阅读 2018-10-10 09:32:21
    最后有队列的简单使用例子 ----------------------------------------------------------------- 在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另...
  • 阻塞队列和非阻塞队列(JAVA)

    千次阅读 2019-08-11 16:36:43
    阻塞队列1.1 代码举例1.2 LinkedBlockingQueue2.非阻塞队列2.1 代码举例2.2 ConcurrentLinkedQueue 1.阻塞队列 1.1 代码举例 1个生产者,队列元素大小为2,三个消费者消费 public class TestQueue { private int ...
  • 【版权申明】未经博主同意,谢绝转载!(请尊重原创,博主保留追究权) ... 出自【zejian的博客】 关联文章: 深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) ...深入理解Java注解类型(@...
  • java阻塞队列与非阻塞队列

    千次阅读 2018-10-26 14:01:06
    在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。...阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个...
  • 队列的数据结构大家并不陌生,先进先出,先到先得, ...ArrayBlockingQueue 是一个基于数组的有界阻塞...SynchronousQuere是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插...
  • 深入Java阻塞队列

    2018-06-11 11:37:48
    何为阻塞队列阻塞队列,重点在于阻塞二字,意思就是支持阻塞插入和阻塞移除的队列。阻塞插入:当队列中的元素满了,插入操作线程将阻塞直至队列有空闲空间;阻塞移除:当队列中的元素为空,就是指没有元素时,移除...
  • 1. 阻塞队列概述 ① 什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,...
1 2 3 4 5 ... 20
收藏数 288,686
精华内容 115,474
关键字:

阻塞队列