blockingqueue_blockingqueue大小 - CSDN
精华内容
参与话题
  • BlockingQueue

    千次阅读 2015-10-01 14:46:03
    在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了...

    前言:
    在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
    认识BlockingQueue
    阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

    从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
    常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)
      先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
      后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

      多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)
    

    下面两幅图演示了BlockingQueue的两个常见阻塞场景:
           如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

       如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
    这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。既然BlockingQueue如此神通广大,让我们一起来见识下它的常用方法:
    BlockingQueue的核心方法:
    放入数据:
      offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
        则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
      offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
        加入BlockingQueue,则返回失败。
      put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
        直到BlockingQueue里面有空间再继续.
    获取数据:
      poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
        取不到时返回null;
      poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
        队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
      take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
        BlockingQueue有新的数据被加入;
      drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
        通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
    常见BlockingQueue
    在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue家庭大致有哪些成员?

    BlockingQueue成员详细介绍
    1. ArrayBlockingQueue
    基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
      ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

    1. LinkedBlockingQueue
      基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
      作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

    ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

    下面的代码演示了如何使用BlockingQueue:

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;

    /**
    * @author jackyuj
    */
    public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        // 声明一个容量为10的缓存队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
    
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);
    
        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动线程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);
    
        // 执行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
    
        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
    

    }

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;

    /**
    * 消费者线程
    *
    * @author jackyuj
    */
    public class Consumer implements Runnable {

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    
    public void run() {
        System.out.println("启动消费者线程!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null != data) {
                    System.out.println("拿到数据:" + data);
                    System.out.println("正在消费数据:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出消费者线程!");
        }
    }
    
    private BlockingQueue<String> queue;
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    

    }

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
    * 生产者线程
    *
    * @author jackyuj
    */
    public class Producer implements Runnable {

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
    
    public void run() {
        String data = null;
        Random r = new Random();
    
        System.out.println("启动生产者线程!");
        try {
            while (isRunning) {
                System.out.println("正在生产数据...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    
                data = "data:" + count.incrementAndGet();
                System.out.println("将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("放入数据失败:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出生产者线程!");
        }
    }
    
    public void stop() {
        isRunning = false;
    }
    
    private volatile boolean      isRunning               = true;
    private BlockingQueue queue;
    private static AtomicInteger  count                   = new AtomicInteger();
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    

    }
    3. DelayQueue
    DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
    使用场景:
      DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

    1. PriorityBlockingQueue
      基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

    2. SynchronousQueue
      一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
        声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
        如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
        但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
      小结
        BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

    展开全文
  • 阻塞队列BlockingQueue及其子类的使用

    千次阅读 2018-04-26 10:46:54
    BlockingQueue前言: BlockingQueues在java.util.concurrent包下,提供了线程安全的队列访问方式,当阻塞队列插入数据时,如果队列已经满了,线程则会阻塞等待队列中元素被取出后在插入,当从阻塞队列中取数据时,...

    BlockingQueue

    前言:

         BlockingQueues在java.util.concurrent包下,提供了线程安全的队列访问方式,当阻塞队列插入数据时,如果队列已经满了,线程则会阻塞等待队列中元素被取出后在插入,当从阻塞队列中取数据时,如果队列是空的,则线程会阻塞等待队列中有新元素。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。


    认识BlockingQueue:

    阻塞队列是一个具有阻塞添加和阻塞删除功能的队列,数据结构如下

    • 阻塞添加 
      所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。

    • 阻塞删除 
      阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)

    • 数据结构


    上图可以多线程环境下,通过队列很容易实现数据共享,    比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒

        这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了,让我们一起来见识下它的常用方法 

    BlockingQueue的使用:

    阻塞队列主要用在生产者/消费者的场景,下面这幅图展示了一个线程生产、一个线程消费的场景:

    这里写图片描述

    负责生产的线程不断的制造新对象并插入到阻塞队列中,直到达到这个队列的上限值。队列达到上限值之后生产线程将会被阻塞,直到消费的线程对这个队列进行消费。同理,负责消费的线程不断的从队列中消费对象,直到这个队列为空,当队列为空时,消费线程将会被阻塞,除非队列中有新的对象被插入。


    BlockingQueue的核心方法:
    -方法\行为抛异常特定的值阻塞超时
    插入方法add(o)offer(o)put(o)offer(o, timeout, timeunit)
    移除方法 poll(),remove(o)take()poll(timeout, timeunit)
    检查方法element()peek() 

    行为解释:

    1.抛异常:如果操作不能马上进行,则抛出异常
    2. 特定的值:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
    3. 阻塞:如果操作不能马上进行,操作会被阻塞
    4. 超时:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

    • 插入方法

      • add(E e) : 添加成功返回true,失败抛IllegalStateException异常
      • offer(E e) : 成功返回 true,如果此队列已满,则返回 false。
      • put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞
    • 删除方法:

      • remove(Object o) :移除指定元素,成功返回true,失败返回false
      • poll() : 获取并移除此队列的头元素,若队列为空,则返回 null
      • take():获取并移除此队列头元素,若没有元素则一直阻塞。
    • 检查方法

      • element() :获取但不移除此队列的头元素,没有元素则抛异常
      • peek() :获取但不移除此队列的头;若队列为空,则返回 null。
    BlockingQueue的数据结构:

    实现类ArrayBlockingQueue的基本使用:

    ArrayBlockingQueue:是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。另外它以FIFO先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部 

    BlockingQueue queue = new ArrayBlockingQueue(1024);
    queue.put("1");
    Object object = queue.take();

    有点需要注意的是ArrayBlockingQueue内部的阻塞队列是通过重入锁ReenterLock和Condition条件队列实现的,所以ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。创建公平与非公平阻塞队列代码如下:

    //默认非公平阻塞队列
    ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
    //公平阻塞队列
    ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
    
    //构造方法源码
    public ArrayBlockingQueue(int capacity) {
         this(capacity, false);
     }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
         if (capacity <= 0)
             throw new IllegalArgumentException();
         this.items = new Object[capacity];
         lock = new ReentrantLock(fair);
         notEmpty = lock.newCondition();
         notFull =  lock.newCondition();
     }

    实现类LinkedBlockingQueue的基本使用:

    LinkedBlockingQueue:是一个由链表实现的有界队列阻塞队列,但大小默认值为Integer.MAX_VALUE,如果需要的话,这一链式结构可以自定义一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。建议指定队列大小,默认大小在添加速度大于删除速度情况下可能造成内存溢出,LinkedBlockingQueue队列也是按 FIFO(先进先出)排序元素

    构造方法源码:

    //默认大小为Integer.MAX_VALUE
    public LinkedBlockingQueue() {
           this(Integer.MAX_VALUE);
    }
    
    //创建指定大小为capacity的阻塞队列
    public LinkedBlockingQueue(int capacity) {
         if (capacity <= 0) throw new IllegalArgumentException();
         this.capacity = capacity;
         last = head = new Node<E>(null);
     }
    
    //创建大小默认值为Integer.MAX_VALUE的阻塞队列并添加c中的元素到阻塞队列
    public LinkedBlockingQueue(Collection<? extends E> c) {
         this(Integer.MAX_VALUE);
         final ReentrantLock putLock = this.putLock;
         putLock.lock(); // Never contended, but necessary for visibility
         try {
             int n = 0;
             for (E e : c) {
                 if (e == null)
                     throw new NullPointerException();
                 if (n == capacity)
                     throw new IllegalStateException("Queue full");
                 enqueue(new Node<E>(e));
                 ++n;
             }
             count.set(n);
         } finally {
             putLock.unlock();
         }
     }
    ArrayBlockingQueue和LinkedBlockingQueue的区别:

    1.队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。

    2.数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。

    3.由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

    4.两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

    DelayQueue
    DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    DelayQueue中的元素必须实现 java.util.concurrent.Delayed接口,这个接口的定义非常简单:

    public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
    }

    getDelay()方法的返回值就是队列元素被释放前的保持时间,如果返回0或者一个负值,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象。

    从上面Delayed 接口定义可以看到,它还继承了Comparable接口,这是因为DelayedQueue中的元素需要进行排序,一般情况,我们都是按元素过期时间的优先级进行排序

    public class DelayedElement implements Delayed {
      private long expired;
      private long delay;
      private String name;
    
      DelayedElement(String elementName, long delay) {
             this. name = elementName;
             this. delay= delay;
             expired = ( delay + System. currentTimeMillis());
      }
    
      @Override
      public int compareTo(Delayed o) {
            DelayedElement cached=(DelayedElement) o;
             return cached.getExpired()> expired?1:-1;
      }
    
      @Override
      public long getDelay(TimeUnit unit) {
    
             return ( expired - System. currentTimeMillis());
      }
    
      @Override
      public String toString() {
             return "DelayedElement [delay=" + delay + ", name=" + name + "]";
      }
    
      public long getExpired() {
             return expired;
      }
    
    }

    设置这个元素的过期时间为3s

    public class DelayQueueExample {
      public static void main(String[] args) throws InterruptedException {
            DelayQueue<DelayedElement> queue= new DelayQueue<>();
            DelayedElement ele= new DelayedElement( "cache 3 seconds",3000);
             queue.put( ele);
            System. out.println( queue.take());
    
      }

    PriorityBlockingQueue队列

    PriorityBlockingQueue是一个没有边界的队列,所以不会阻塞生产者,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中不允许插入null对象。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。

    SynchronousQueue队列

    SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。


    拓展:

    先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
    后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件

    在最近的RocketMQ的项目中用到阻塞队列,代码如下:实时监听MQ消息,收到消息,先做简单 的处理(存入队列),在开启线程消费消息(监听和消费消息解耦合)

    
    @Component("MQCP_CID_PH_SSP_DEFAULT")
    public class MqcpCosumerServiceImpl implements IMqcpCosumerService,InitializingBean,DisposableBean {
    
    
    	private Logger LOGGER =LoggerFactory.getLogger(getClass());
    	private static MQCPConsumer pushConsumer = null;
    	private static LinkedBlockingQueue<MQCPMessage> msgQueue;
    	private static final int QUEUE_MAX_SIZE = 20000;
    	
    	@Autowired
    	private ISystemConfigService systemConfigService;	
    	@Autowired
    	private IMqcpMessageService mqcpMessageService;
    
    
    	public static void setPushConsumer(MQCPConsumer pushConsumer) {
    		MqcpCosumerServiceImpl.pushConsumer = pushConsumer;
    	}
    
    	public static void setMsgQueue(LinkedBlockingQueue<MQCPMessage> msgQueue) {
    		MqcpCosumerServiceImpl.msgQueue = msgQueue;
    	}
    
    	public void initMsgQueue(){
    		setMsgQueue(new LinkedBlockingQueue<MQCPMessage>(QUEUE_MAX_SIZE));
    	}
    	
    	public MqcpCosumerServiceImpl() {
    		initMsgQueue();
    	}
    	
    	
    	
    	@Override
    	public void initMQCPCosumer(){
    		try {
    			Properties p = SystemResourceUtil
    					.getPropertisByName(ResourceFileNameConstants.PROPERTIES_MQCP_CLIENT);	
    			p.setProperty(MQCPConstant.INSTANCE_NAME, systemConfigService.getSysGUID());	
    			setPushConsumer(MQCPFactory.createConsumer(p));
    			MQCPMessageFilter mqcpFilter = new MQCPMessageFilter();
    			List<String> list = new ArrayList<String>();
    			//根据tag 过滤消息,MQCP只取出发送消息时设置了该tag值的消息
    			list.add("T_APS_APPL_INFO");
    			list.add("T_APS_LOAN_AGREEMENT");
    			list.add("T_APS_EXPENSE_APPL");
    			list.add("T_APS_LOAN_LOG");
    			list.add("T_ICORE_CGI_POLICY_INFO");
    			mqcpFilter.setTags(list);
    			// 实时监听消息
    			pushConsumer.subscribe(SystemResourceUtil.getPropertiesValueByKey(
    					ResourceFileNameConstants.PROPERTIES_MQCP_CLIENT, "TOPIC_ID_ILOAN"), mqcpFilter, new MQCPMessageListener() {		
    				@Override
    				public MQCPConsumeStatus pushMessage(List<MQCPMessage> messageList) {
    					try {
    						for (MQCPMessage msg : messageList) {
    							LOGGER.info(String.format("存证MQCPCosumer监听到Topic_id:[%s]的消息key:[%s],准备存入队列",msg.getTopic(),msg.getKey()));
    							msgQueue.put(msg);						
    						}
    						return MQCPConsumeStatus.CONSUME_OK;
    					} catch (Exception e) {
    						LOGGER.error("存证MQCPCosumer消息存入队列异常", e);
    						return MQCPConsumeStatus.CONSUME_FAIL;
    					}			
    					
    				}
    			});	
    			pushConsumer.start();
    			LOGGER.info("初始化存证MQCPConsumer订阅服务:success");
    			} catch (MQCPException e) {
    				LOGGER.error("初始化存证MQCPCosumer订阅服务异常", e);
    			}
    		
    	}
    
    	
    	@Override
    	public void shutDownMQCPCosumer() {
    		try {
    			if(pushConsumer !=null){
    				pushConsumer.shutdown();
    			}
    		} catch (Exception e) {
    			LOGGER.error("存证MQCPCosumer关闭异常", e);
    		}
    
    		
    	}
    
    	@Override
    	public void afterPropertiesSet() throws Exception {
    		
    		initMQCPCosumer();//开启消费者
    		new Thread(new InsertTable()).start();
    		
    	}
    
    	@Override
    	public void destroy() throws Exception {
    		shutDownMQCPCosumer();		
    	}
    	
    	class InsertTable implements Runnable{
    
    		@Override
    		public void run() {
    				while(!Thread.interrupted()){
    					try {
    						MQCPMessage msg = msgQueue.take();	
    						LOGGER.info(String.format("存证MQCP消息入库线程开启[%s],消息TAG:[%s]", Thread.currentThread().getName(),msg.getTag()));
    						mqcpMessageService.insertTable(msg);
    					} catch (Exception e) {
    						LOGGER.error("存证MQCP消息入库异常",e);
    					}
    				}
    				LOGGER.warn(String.format("存证线程已停止[%s]", Thread.currentThread().getName()));
    			} 	
    	}
    
    }
    



    展开全文
  • 详解BlockingQueue

    千次阅读 2019-04-11 19:52:16
    1、特点: ...下面BlockingQueue中的方法支持这样的操作,当不同的方法操作不能立即被满足,但是在将来可能被满足:第一个抛出异常,第二个返回一个特殊的值(null或false,取决于哪种操作),第三块...

    1、特点:
    当从这个队列中取元素时,它支持这样的操作:如果队列为空,那它就等待队列不为空时,再执行取操作。
    当向这个队列中存元素时,它支持这样的操作:如果队列已满,那它就等待队列可用,再向队列中存放元素。

    下面BlockingQueue中的方法支持这样的操作,当不同的方法操作不能立即被满足,但是在将来可能被满足:第一个抛出异常,第二个返回一个特殊的值(null或false,取决于哪种操作),第三块会使得当前线程无限期的等待,直到操作可以执行,第四块仅仅给定一个最大等待时间,一旦超过这个时间,就会放弃操作。

    抛出异常 返回特殊值 阻塞 定时等待
    插入 add(e) offer(e) put(e) offer(e, time, unit)
    删除 remove() poll() take() poll(time, unit)
    检查 element() peek() 不可用 不可用

    BlockingQueue不接受空值。如果试图用add, put, 或者offer去添加一个null值,则会抛出NullPointerException异常。如果poll 操作失败,则会返回一个空值。

    BlockingQueue可能是有界的。在任何一个给定的时间,它都有一个剩余容量,如果超过这个容量,那么超出的元素将会被阻塞。如果BlockingQueue没有任何内在容量的约束,那么它的剩余容量将会一直是Integer.MAX_VALUE。

    BlockingQueue的实现类主要是设计实现生产者-消费者队列,但是额外支持Collection接口。所以你可以利用remove(x)从队列中删除一个元素。但是这样的操作并不特别高效,仅仅是为了在特定的场景中使用,比如当队列中消息被取消时。

    BlockingQueue的实现类是线程安全的。通过内在的锁或者其他的同步机制,所有的队列方法都能够原子性的操作。然而大量的集合操作例如:addAll,containsAll,retainsAll,以及removeAll都不一定会保证原子操作,除非在实现中以其他方式指定。所以当你使用addAll方法将集合c加入到队列中时,可能会抛出一个异常。

    用典型的生产者和消费者为例,表明BlockingQueue可以被安全的用于多个生产者和消费者。

    class Producer implements Runnable {
       private final BlockingQueue queue;
       Producer(BlockingQueue q) { queue = q; }
       public void run() {
         try {
           while (true) { queue.put(produce()); }
         } catch (InterruptedException ex) { ... handle ...}
       }
       Object produce() { ... }
     }
    
     class Consumer implements Runnable {
       private final BlockingQueue queue;
       Consumer(BlockingQueue q) { queue = q; }
       public void run() {
         try {
           while (true) { consume(queue.take()); }
         } catch (InterruptedException ex) { ... handle ...}
       }
       void consume(Object x) { ... }
     }
    
     class Setup {
       void main() {
         BlockingQueue q = new SomeQueueImplementation();//这里的具体实现类,接下来会总结到
         Producer p = new Producer(q);
         Consumer c1 = new Consumer(q);
         Consumer c2 = new Consumer(q);
         new Thread(p).start();
         new Thread(c1).start();
         new Thread(c2).start();
       }
     }
    

    接下来我们看一下这个BlockingQueue接口中的方法,都有什么用处

    返回值 方法 描述
    boolean add(E e) 如果还有剩余容量,则向队列中插入一个元素并返回true。如果没有可用空间,则抛出IllegalStateException
    boolean contains(Object o) 如果队列中包含这个元素返回true
    int drainTo(Collection<? super E> c) 将队列中所有可用元素移动到这个Collection集合中
    int drainTo(Collection<? super E> c,int maxElements) 将队列中最多maxElement个元素中移动到Collection集合中
    boolean offer(E e) 如果还有剩余容量,则向队列中插入一个元素并返回true,如果没有剩余空间则返回false
    boolean offer(E e, long timeout, TimeUnit unit) 在给定时间内将元素插入,返回true,否则返回false
    E poll(long timeout, TimeUnit unit) 在给定时间内获取到队首元素,则返回队首元素,否则返回null
    void put(E e) 如果可以,就将元素插入到队列中,否则就一直等待到可以插入
    int remainingCapacity() 返回此队列在不阻塞的情况下,可以接受的其他元素的数量,如果没有内在的限制就返回Integer.MAX_VALUE
    E take() 获取并移除队首元素,如果队列中没有元素可用,它会等待直到队列中有元素可用

    了解了BlockingQueue接口的设计思想后,我们来看一下它的几个实现类吧。
    下面是它的所有实现类:

    在这里插入图片描述
    简单介绍一下其中几个常用的实现类

    1、ArrayBlockingQueue:内部基于数组实现的有界阻塞队列,按照先进先出(FIFO)的原则对元素进行排序,支持公平锁和非公平锁。由于对于生产者放入数据和消费者取出数据使用的是同一个锁,所以二者无法真正并行运行。同时由于数组中存放的直接是放入的数据,所以不会产生额外的对象实例。

    2、LinkedBlockingQueue:内部基于链表实现的有界阻塞队列(默认Integer.MAX_VALUE),按照先进先出的顺序对元素存取,内部使用非公平锁对于生产者和消费者使用的是两把不同的锁所以生产者和消费者可以同步进行。由于是使用的链表结构,所以会生成Node节点对象实例,销毁的时候需要额外处理。

    3、DelayQueue:内部基于非线程安全的优先队列实现的无界阻塞队列,内部使用非公平锁。DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。使用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

    4、PriorityQueue:内部基于实现的无界阻塞队列(可进行扩容,默认11),内部使用非公平锁。 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。该队列不会阻塞生产者产生数据,当队列中没有数据时,会阻塞消费者。所以如果生产者的速度一旦超过消费者,会快速消耗内存资源。

    5、SynchronousQueue:内部是基于无缓冲并且无界的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列;

    7、LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。

    8、LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

    下面就详细看一下ArrayBlockingQueue原理,然后简单说一下LinkedBlockingQueue。其他的阻塞队列源码就不介绍了,懂了一个,其他自然就容易看懂了。

    1、ArrayBlockingQueue

    其中重要的属性解释

    	final Object[] items;    //内部存储数组
        int takeIndex;			//下一个要取走的元素的下标,用于remove、poll、take方法
        int putIndex;			//下一个要放入的元素的下标,用于add、offer、put方法
        int count;				//该队列中的元素
        final ReentrantLock lock; //保护所有通道的主锁
        private final Condition notEmpty;//Condition for waiting takes,用于阻塞和唤醒take操作
        private final Condition notFull;// Condition for waiting puts,用于阻塞和唤醒put操作
    

    接下来是构造函数,从构造函数中我们可以看出ArrayBlockingQueue支持公平锁和非公平锁。

    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

    默认构造函数是非公平的

    public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    

    还支持将Collection集合类中的元素直接放到ArrayBlockingQueue的items数组中,进行初始化。

    public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c){
    ....
    }
    

    1、add方法

    public boolean add(E e) {
            return super.add(e);
        }
    //找到父类操作
    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    

    可以看出,它是通过offer方法添加到数组中,如果已满,则抛出异常。
    2、offer方法

    public boolean offer(E e) {
            Objects.requireNonNull(e);
            ReentrantLock lock = this.lock;
            lock.lock();
    
            boolean var3;
            try {
                if (this.count != this.items.length) {//这里确保可以添加到数组中去
                    this.enqueue(e); //offer通过enqueue方法添加到数组
                    var3 = true;
                    return var3;    //成功返回true
                }
    
                var3 = false;    //否则标记为false
            } finally {
                lock.unlock();
            }
    
            return var3;      //返回false
        }
    private void enqueue(E e) {
            Object[] items = this.items;
            items[this.putIndex] = e;  //这里putIndex即为下一个可放元素的下标
            if (++this.putIndex == items.length) { //如果下一个下标越界,则返回数组头部
                this.putIndex = 0;  //这里恰好构成一个循环,从第一个一直加到最后一个,然后返回头部继续添加
            }
    
            ++this.count;
            this.notEmpty.signal();   //这里用来唤醒因数组为空而进入等待状态的方法
        }
    

    3、put方法

    public void put(E e) throws InterruptedException {
            Objects.requireNonNull(e);
            ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while(this.count == this.items.length) {//如果已满,则释放锁等待唤醒
                    this.notFull.await();
                }
    
                this.enqueue(e);  //这里put方法也是通过enqueue方法添加元素
            } finally {
                lock.unlock();
            }
    
        }
    

    这里对比一下这三个添加方法,虽然前面已经总结过了,但是看过源代码后,应该更直观一些。

    方法 是否允许插入空值 队列元素已满
    add(E e) 不允许 抛出IllegalStateException异常
    offer(E e) 不允许 返回false
    put(E e) 不允许 释放锁,等待唤醒

    很明显,这三个方法,第一步都对插入元素进行了判空,如果为空则抛出异常。

    4、remove方法

    public boolean remove(Object o) {
            if (o == null) {   //如果要移除的元素为空,返回false
                return false;
            } else {
                ReentrantLock lock = this.lock;
                lock.lock();
    
                try {
                    if (this.count > 0) {  //如果数组元素数目大于0
                        Object[] items = this.items;
                        int i = this.takeIndex;
                        int end = this.putIndex;
                        int to = i < end ? end : items.length;
    
                        label96:
                        while(true) {
                            while(i >= to) {
                                if (to == end) {
                                    break label96;
                                }
    
                                i = 0;
                                to = end;
                            }
    
                            if (o.equals(items[i])) { //找到元素并移除,返回true
                                this.removeAt(i);
                                boolean var7 = true;
                                return var7;
                            }
    
                            ++i;
                        }
                    }
    
                    boolean var11 = false;   //找不到元素返回false
                    return var11;
                } finally {
                    lock.unlock();
                }
            }
        }
    

    5、poll方法

    public E poll() {
            ReentrantLock lock = this.lock;
            lock.lock();
    
            Object var2;
            try {
                var2 = this.count == 0 ? null : this.dequeue();//如果没有元素可取,返回null,否则返回队首元素
            } finally {
                lock.unlock();
            }
    
            return var2;
        }
    private E dequeue() {			//利用这个方法来取走队首元素
            Object[] items = this.items;
            E e = items[this.takeIndex];
            items[this.takeIndex] = null;//这里仅需将队首元素赋值为空,其内存交由GC回收
            if (++this.takeIndex == items.length) {
                this.takeIndex = 0;
            }
    
            --this.count;
            if (this.itrs != null) {
                this.itrs.elementDequeued();
            }
    
            this.notFull.signal();  //这里唤醒因为队列已满而等待的方法。
            return e;
        }
    

    5、take方法

    
    public E take() throws InterruptedException {
            ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            Object var2;
            try {
                while(this.count == 0) {  //如果没有元素,则等待唤醒
                    this.notEmpty.await();
                }
                var2 = this.dequeue();   //如果有元素则取走队首元素
            } finally {
                lock.unlock();
            }
    
            return var2;
        }
    

    这里再对比一下remove、poll、take方法取走元素时,队列为空的情况

    方法 队列为空
    remove() 返回false
    poll() 返回null
    take() 等待唤醒

    以上就是对ArrayBlockingQueue添加和删除方法的介绍

    2、LinkedBlockingQueue
    存储结构(单向链表)

    static class Node<E> {
            E item;
            Node<E> next;
            Node(E x) { item = x; }
    }
    

    属性

    private final int capacity;//链表的最大容量
    private final AtomicInteger count;//链表中插入的数据个数
    transient LinkedBlockingQueue.Node<E> head;//链表头结点指针
    private transient LinkedBlockingQueue.Node<E> last;//链表尾节点指针
    private final ReentrantLock takeLock;//取锁
    private final Condition notEmpty;
    private final ReentrantLock putLock;//放锁
    private final Condition notFull;
    

    方法和ArrayBlockingQueue实现原理类似,不同的是ArrayBlockingQueue基于数组实现,LinkedBlockingQueue基于链表实现,所以对元素的操作会有所不同。

    参考:https://www.cnblogs.com/WangHaiMing/p/8798709.html
    参考:https://www.cnblogs.com/KingIceMou/p/8075343.html

    展开全文
  • 多线程(三):队列 BlockingQueue

    千次阅读 2018-09-01 19:29:41
    public interface BlockingQueue&amp;amp;amp;amp;amp;lt;E&amp;amp;amp;amp;amp;gt; extends Queue&amp;amp;amp;amp;amp;lt;E&amp;amp;amp;amp;amp;gt; { // 添加成功返回true,否则抛出异常 ...

    一:简介

    在线程池中会经常使用BlockingQueue,BlockingQueue是一种阻塞队列,阻塞队列的特性:我在放的时候别人不能放,我在取的时候别人不能取,满的时候就不能再添加,等待有人取走,才能放

    public interface BlockingQueue<E> extends Queue<E> {
        // 添加成功返回true,否则抛出异常
        boolean add(E e);
        // 添加成功返回true,失败返回false
        boolean offer(E e);
        boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
        // 添加时如果没有空间则调用此方法的线程来阻断知道队列有空间再继续
        void put(E e) throws InterruptedException;
        // 取走队列里的首位对象,如果不能立即取出,阻断进入等待状态直到队列有新的对象加入为止
        E take() throws InterruptedException;
        // 取走首位对象,若不能立即取出,可以等unit时间,取不到返回null
        E poll(long timeout, TimeUnit unit) throws InterruptedException;
        boolean remove(Object o);
        // 队列剩余容量
        int remainingCapacity();
        // 是否包含对象
        public boolean contains(Object o);
    
        // 移除次队列中所有可用的元素,并将它们添加到指定的集合中
        int drainTo(Collection<? super E> c);
        // 指定最大移动元素的个数
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    BlockingQueue

    • ArrayBlockingQueue: 一个由数组支持的有界阻塞队里,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明大小,其所含的对象是以先入先出的顺序排序的
    • LinkedBlockingQueue:基于链表的阻塞队列,大小不定的BlockingQueue,可以指定长度,不指定默认为Interger.MAX_VALUE,也是先入先出的顺序排序的,主要用到put和take,put方法在队列满的时候会阻塞知道有队列成员被消费,take方法在队列空的时候会阻塞,知道有新的队列成员被放入
    • PriorityBlockingQueue:带有优先级的BlockingQueue
    • DelayQueue: 一定时间之后才可以take的BlockingQueue
    • SynchronousQueue:直接传递的BlockingQueue,用于执行Producer角色到Consumer角色的直接传递。

    • ConcurrentLinkedQueue: 元素个数没有最大限制的线程安全队列,该类不是BlockingQueue的子类, 内部数据结构是分开的,线程之间互不影响,所以也就无需执行互斥处理。根据线程情况的不同,有时程序的性能也会有所提高。


    示例一
    客户端(生产者)不停的向集合中添加元素,服务器端(消费端)不停的从集合中取出元素

    public class Request {
        private final String name;
    
    
        public Request(String name) {
            this.name = name;
        }
    
        public String getName() {
            return name;
        }
    
        @Override
        public String toString() {
            return "Request{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    public class RequestQueue {
        // LinkedList也是一种Queue
        private final Queue<Request> queue = new LinkedList<>();
    
        public synchronized Request getRequest() {
            // 判断队列的头元素是否有值,如果没有返回null,即队列时空的
            // 为啥用while而不用if ? 当等待结束了再次检查队列中是否有元素,这样更加安全
            while (queue.peek() == null) {
                try { wait(); } catch (InterruptedException e) { }
            }
    
            return queue.remove();
        }
    
        public synchronized void putRequest(Request request) {
            // 向队列末尾添加元素
            queue.offer(request);
            notifyAll();
        }
    }
    public class ClientThread extends Thread {
        private final Random random;
        private final RequestQueue requestQueue;
    
        public ClientThread(RequestQueue requestQueue, String name, long seed) {
            super(name);
            this.requestQueue = requestQueue;
            this.random = new Random(seed);
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Request request = new Request("No." + i);
                System.out.println(Thread.currentThread().getName() + "\trequest\t" + request);
                requestQueue.putRequest(request);
    
                try {
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public class ServerThread extends Thread {
        private final Random random;
        private final RequestQueue requestQueue;
    
        public ServerThread(RequestQueue requestQueue, String name, long seed) {
            super(name);
            this.requestQueue = requestQueue;
            this.random = new Random(seed);
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Request request = requestQueue.getRequest();
                System.out.println(Thread.currentThread().getName() + "\thandles\t" + request);
    
                try {
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public class Main {
        public static void main(String[] args) {
            RequestQueue requestQueue = new RequestQueue();
            new ClientThread(requestQueue, "Alice", 3141592L).start();
            new ServerThread(requestQueue, "Bobby", 6535897L).start();
        }
    }

    这里写图片描述

    上面示例其实就是LinkedBlockingQueue的实现,Java已经提供了这样的功能:java.util.concurrent.LinkedBlockingQueue

    修改RequestQueue,其他类和示例一保持不变

    public class RequestQueue {
        private final BlockingQueue<Request> queue = new LinkedBlockingQueue<>();
    
        public Request getRequest() {
            Request request = null;
            try {
                // 取出队列首元素,当队列为空时,就会wait
                // take和put方法已经考虑互斥处理,所以两个方法都不需要synchronized
                request = queue.take();
            } catch (InterruptedException e) {
            }
    
            return request;
        }
    
        public void putRequest(Request request) {
            try {
                // 向队列末尾添加元素
                queue.put(request);
            } catch (InterruptedException e) {
    
            }
        }
    }
    package java.util.concurrent;
    
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
            public E take() throws InterruptedException {
                E x;
                int c = -1;
                final AtomicInteger count = this.count;
                // 加锁
                final ReentrantLock takeLock = this.takeLock;
                takeLock.lockInterruptibly();
                try {
                    while (count.get() == 0) {
                        notEmpty.await();
                    }
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                } finally {
                    takeLock.unlock();
                }
                if (c == capacity)
                    signalNotFull();
                return x;
        }
    
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            // 加锁
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    }        

    示例二

    Producer 生产者

    public class Producer implements Runnable {
        BlockingQueue<String> queue;
    
        public Producer(BlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                String temp = "Producer make a thing:" + Thread.currentThread().getName();
                System.out.println("Producer\t" + new Date() + "\t" + Thread.currentThread().getName() + "\t生产中ing...");
                queue.put(temp);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    Consumer 消费者

    public class Consumer implements Runnable {
        BlockingQueue<String> queue;
    
        public Consumer(BlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(500);
    
                // 如果队列为空会阻塞队列
                String value = queue.take();
                System.out.println("Consumer" + "\t" + new Date() + "\t" + Thread.currentThread().getName() + "\ttake\t" + value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class BlockingQueueTest {
        public static void main(String[] args) {
            BlockingQueue<String> queue = new LinkedBlockingQueue<>(2);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
    
            for (int i = 0; i < 10; i++) {
                new Thread(producer, "Producer" + (i+1)).start();
            }
    
            for (int i = 0; i < 5; i++) {
                new Thread(consumer, "Consumer" + (i+1)).start();
            }
        }
    }

    这里写图片描述


    并发队列
    ConcurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁方式,实现高并发状态下的高性能,通常ConcurrentLinkedQueue性能要好于BlockingQueue,
    它是一个基于链接节点的无界线程安全队列,遵守先进先出,头是最先加入的,尾是最近加入的,不允许null值。

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
         // 将元素添加到队列末尾   
         public boolean offer(E e);
    
         // 取出队列的元素,size减掉1
         public E poll();
    
         // 如果队列中存在元素,返回头元素,如果为空,返回null
         // 该方法并不移除元素
         public E peek();
    
         // 队列长度
         public int size();   
    
         // 移除队列的第一个元素,并返回该元素,如果队列为空,则抛异常NoSuchElementException
         public E remove();    
    }
    展开全文
  • 【Java并发之】BlockingQueue

    万次阅读 多人点赞 2015-10-23 11:50:27
    本文主要讲的是并发包中涉及到的集合,关于普通集合,请参考【java 集合概览】一、什么是BlockingQueueBlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。...
  • 图灵学院:Java高并发之BlockingQueue

    万次阅读 2018-05-11 19:29:58
    1:BlockingQueue继承关系 java.util.concurrent 包里的 BlockingQueue是一个接口, 继承Queue接口,Queue接口继承 Collection BlockingQueue-----&gt;Queue--&gt;Collection 图: 队列的特点是:先进先...
  • BlockingQueue(阻塞队列)详解

    千次阅读 2017-09-25 13:01:19
    前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了...
  • BlockingQueue详解

    2020-10-16 15:47:46
     在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了...
  • Java并发编程-阻塞队列(BlockingQueue)的实现原理

    万次阅读 多人点赞 2016-08-10 19:58:48
    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,...
  • BlockingQueue阻塞队列的长度

    千次阅读 2018-09-04 16:06:00
    BlockingQueue阻塞队列的长度是多少 解决方法: BlockingQueue&lt;String&gt; list = new LinkedBlockingQueue&lt;String&gt;(); list 可以存放:...
  • BlockingQueue 使用详解

    万次阅读 2019-05-07 22:19:25
    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,...
  • JUC之阻塞队列

    千次阅读 2019-12-17 11:18:14
    阻塞队列 1、队列+阻塞队列 队列----先到先得 阻塞队列-----首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如图 当阻塞队列是空的时候,从队列中获取元素的操作将会被阻塞 当阻塞队列是满的时候...
  • Android 之BlockingQueue

    千次阅读 2016-08-04 00:07:58
    BlockingQueueBlockingQueue 是新增的Concurrent包中的一种阻塞队列特点当BlockingQueue为空, 从队列取数据时会让线程等待状态,直到能取出非空的数据,线程会被唤醒。 当BlockingQueue是满的,存数据到队列时线程...
  • Java中BlockingQueue性能瓶颈问题

    千次阅读 2012-03-28 15:52:40
    最近调研一个数据分析模块中的性能问题,花费将近两周时间。期间做了许多Enhancement,而最后关于总线...在数据交互过程中,使用JDK中的阻塞队列ArrayBlockingQueue:生产者的提供Object存入总线中的BlockingQueue;总
  • Java BlockQueue

    千次阅读 2016-05-29 22:04:49
    基本原理特殊的队列:BlockingQueue如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被...
  • 阻塞队列BlockingQueue(JDK8)

    千次阅读 2017-06-10 18:32:02
    BlockingQueue(阻塞队列)是JDK5新增的线程安全的高效队列类,基于生产者-消费者模式。队列为空时,获取元素的操作等待队列变为非空;队列已满时插入操作等待队列空间可用。 BlockingQueue不接受null元素,会throw...
  • 使用BlockingQueue封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。 下面是Jdk 1.7中ArrayBlockingQueue部分代码:   public ArrayBlockingQueue(int capacity...
  • C++11 线程安全的BlockingQueue实现

    千次阅读 2016-09-21 07:53:23
    C++11 BlockingQueue
  • BlockingQueue深入分析

    万次阅读 2014-03-10 23:50:13
    1.BlockingQueue定义的常用方法如下   抛出异常 特殊值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e,time,unit) 移除 remove() poll() take() poll(time,unit) 检查 element() peek() 不可用 不可用
  • Java多线程——ArrayBlockingQueue的使用

    千次阅读 2018-04-20 16:35:29
    最近找实习,发现各大公司对Java的多线程爱的很深沉,遂决定开...一 位置知道它实现了抽象类BlockingQueue即可,和它一样的小伙伴很多二 定义我只截取了第一段,以后都不截图了,需要的自己下载一个j2se7.chm就行了...
1 2 3 4 5 ... 20
收藏数 43,983
精华内容 17,593
关键字:

blockingqueue