精华内容
下载资源
问答
  • BlockQueue学习笔记

    2021-10-02 14:19:14
    (3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续. 2. 获取数据 (1)poll(time):取走BlockingQueue里排在首位的对象,若不能...

    在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

    • 认识BlockingQueue
      阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:


      从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
      常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)
        先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
        后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

    多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒),下面两幅图演示了BlockingQueue的两个常见阻塞场景:

           如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

     


       如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

      这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。既然BlockingQueue如此神通广大,让我们一起来见识下它的常用方法:

    三. BlockingQueue的核心方法

      1.放入数据

        (1)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法

     的线程);       
           (2)offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

        (3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

      2. 获取数据

        (1)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

        (2)poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间

    超时还没有数据可取,返回失败。

        (3)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 

        (4)drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    四. 常见BlockingQueue

      在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue家庭大致有哪些成员?

      1. ArrayBlockingQueue

      基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

      ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

      2.LinkedBlockingQueue

      基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

      作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

      ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

    展开全文
  • 从源码看ThreadPoolExecutor execute和BlockQueue 文章目录 从源码看ThreadPoolExecutor execute和BlockQueue JDK版本 需要用到的知识点 位运算 线程池原理 线程池的五种状态 RUNNING SHUTDOWN STOP TIDYING ...

    从源码看ThreadPoolExecutor execute和BlockQueue

    JDK版本

    此文章基于jdk 1.8.0_191

    需要用到的知识点

    位运算

    操作符描述例子
    如果相对应位都是1,则结果为1,否则为0(A&B),得到12,即0000 1100
    |如果相对应位都是0,则结果为0,否则为1(A | B)得到61,即 0011 1101
    ^如果相对应位值相同,则结果为0,否则为1(A ^ B)得到49,即 0011 0001
    按位取反运算符翻转操作数的每一位,即0变成1,1变成0。(〜A)得到-61,即1100 0011
    <<按位左移运算符。左操作数按位左移右操作数指定的位数。A << 2得到240,即 1111 0000
    >>按位右移运算符。左操作数按位右移右操作数指定的位数。A >> 2得到15即 1111
    >>>按位右移补零操作符。左操作数的值按右操作数指定的位数右移,移动得到的空位以零填充。A>>>2得到15即0000 1111

    线程池原理

    摘自 汪文君. Java高并发编程详解:多线程与架构设计 (Java核心技术系列) (Kindle 位置 2508-2512). 北京华章图文信息有限公司. Kindle 版本.

    所谓线程池通俗的理解就是有一个池子,里面存放着已经创建好的线程,当有任务提交给线程池执行时,池子中的 某个线程会主动执行该任务。如果池子中的线程数量不够应付数量众多的任务时,则需要自动扩充新的线程到池子 中,但是该数量是有限的,就好比池塘的水界线一样。当任务比较少的时候,池子中的线程能够自动回收,释放 资源。为了能够异步地提交任务和缓存未被处理的任务,需要有一个任务队列。

    [外链图片转存失败(img-s8rOi6Nr-1563702945152)(https://raw.githubusercontent.com/baofeidyz/images/master/img/20190721102700.png)]

    一个完整的线程池应该具备如下要素:

    1. 任务队列:用于缓存提交的任务
    2. 线程数量管理功能:一个线程池必须能够很好地管理和控制线程数量,可通过如下三个参数来实现,比如创建 线程池时初始的线程数量 init;线程池自动扩充时最大的线程数量max;在线程池空闲时需要释放线程但是也要维护一定数量的活跃数量或者核心数量core。有了这三个参数,就能够很好地控制线程池中的线程数量,将其维护在一个合理的范围之内,三者之间的关系是 init<= core<= max
    3. 任务拒绝策略:如果线程数量已达到上限且任务队列已满,则需要有相应的拒绝策略来通知任务提交者
    4. 线程工厂:主要用于个性化定制线程,比如线程设置为守护线程以及设置线程名称等
    5. QueueSize:任务队列主要存放提交的Runnable,但是为了防止内存溢出,需要有limit数量对其进行控制
    6. Keepedalive 时间:该时间主要决定线程各个重要参数自动维护的时间间隔

    线程池的五种状态

    线程池状态示意图以及五种状态的说明摘自CSDN一只逗比的程序猿

    一共有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

    线程池状态切换示意图

    线程池状态切换示意图

    RUNNING

    状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理

    状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0

    SHUTDOWN

    状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务

    状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN

    注:虽然状态已经不是RUNNING了,但是如果任务队列中还有任务的时候,线程池仍然会继续执行,具体分析请见ThreadPoolExecutor.execute()方法解析

    STOP

    状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务

    状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP

    TIDYING

    状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现

    状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
    当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING

    TERMINATED

    状态说明:线程池彻底终止,就变成TERMINATED状态

    状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED

    线程池五种状态的二进制表示

    线程池状态二进制
    RUNNING111
    SHUTDOWN000
    STOP001
    TIDYING010
    TERMINATED011
    COUNT_BITS :29
    RUNNING    :11100000 00000000 00000000 00000000
    SHUTDOWN   :00000000 00000000 00000000 00000000
    STOP       :00100000 00000000 00000000 00000000
    TIDYING    :01000000 00000000 00000000 00000000
    TERMINATED :01100000 00000000 00000000 00000000
    RUNNING    :-536870912
    SHUTDOWN   :0
    STOP       :536870912
    TIDYING    :1073741824
    TERMINATED :1610612736
    

    ThreadPoolExecutor解读

    构造函数解读

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                null :
            AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    先看参数

    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
    

    对应含义关系如下

    参数名类型备注
    corePoolSizeint核心线程数(如果allowCoreThreadTimeOut为true,核心线程将一直存活)
    maximumPoolSizeint允许创建的最大线程数
    (如果使用了无界队列LinkedBlockingQueue,这个值会失效,原因在讲解execute方法中提及)
    keepAliveTimelong非核心线程闲置时的超时时长(如果allowCoreThreadTimeOut为true,这个时长也会用于核心线程)
    unitTimeUnit参数keepAliveTime的单位
    workQueueBlockingQueue<Runnable>任务队列,可选的子类
    ArrayBlockingQueue
    DelayQueue
    LinkedBlockingDeque
    LinkedBlockingQueue
    LinkedTransferQueue
    PriorityBlockingQueue
    SynchronousQueue
    threadFactoryThreadFactory线程工厂,为线程池提供创建新线程的功能(其他构造函数中默认传Executors.defaultThreadFactory()
    handlerRejectedExecutionHandler拒绝策略,当队列和线程池都满了就才会根据这个策略进行处理。(默认为AbortPolicy,直接抛出异常),可选子类ThreadPoolExecutor.AbortPolicy
    ThreadPoolExecutor.CallerRunsPolicy
    ThreadPoolExecutor.DiscardOldestPolicy
    ThreadPoolExecutor.DiscardPolicy

    执行方法解读

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
                 * Proceed in 3 steps:
                 *
                 * 1. If fewer than corePoolSize threads are running, try to
                 * start a new thread with the given command as its first
                 * task.  The call to addWorker atomically checks runState and
                 * workerCount, and so prevents false alarms that would add
                 * threads when it shouldn't, by returning false.
                 *
                 * 2. If a task can be successfully queued, then we still need
                 * to double-check whether we should have added a thread
                 * (because existing ones died since last checking) or that
                 * the pool shut down since entry into this method. So we
                 * recheck state and if necessary roll back the enqueuing if
                 * stopped, or start a new thread if there are none.
                 *
                 * 3. If we cannot queue task, then we try to add a new
                 * thread.  If it fails, we know we are shut down or saturated
                 * and so reject the task.
                 */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    这里涉及到runStateworkerCount两个概念,线程池中是利用32位的int变量来表示。

    因为线程池的状态总共有五种,2^2 = 4, 2^3 = 8,所以需要占用三位,实际采用的就是高三位表示,具体可见线程池五种状态的二进制表示

    剩下的部分全部都用于记录有效线程数,所以代码中也就规定有效线程数不可大于29位,也就是最大为2^29-1,详见execute()方法解析。

    然后我们再来一起看看execute(Runnable command)方法是如何运行的。

    execute()整体逻辑概览

    1. 最开始是一个基本的判空逻辑,如果传入的任务是空的则抛出异常
    2. 第一个判断:检查当前核心线程数,如果当前线程数小于核心线程则调用addWorker()方法创建线程,创建成功返回true
    3. 第二个判断:当核心线程池中的所有线程都在运行,此时将线程放到任务中
    4. 第三个判断:如果核心线程数已经满了,队列也添加失败了,那么这里就会调用上面提到的拒绝策略,如果我们没有在创建线程池时给出特定的拒绝策略,那么默认的实现就是抛出异常。
    throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
    

    细节实现

    第一个判断:检查当前核心线程数

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    
    获取workerCount
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    这里的ctl变量是一个初始值为RUNNINGAtomicInteger对象,拿到的变量c中既存储了当前线程池的状态,又保存了当前线程池中的有效线程数量。

    画外音:这里的AtomicInteger对象为什么是线程安全的,是因为使用了CAS,具体不谈

    判断当前有效线程数量是否大于核心线程数量

    然后我们再看workerCountOf(c)方法的实现:

        private static int workerCountOf(int c)  { 
            return c & CAPACITY; 
        }
    

    CAPACITY的值为:

    00011111 11111111 11111111 11111111
    

    结合位运算法则,这里c & CAPACITY的结果集就是实际的有效线程数量。

    创建核心线程数量

    当有效线程数量小于核心线程数量的时候,我们需要调用addWorker(command, true)的方法去创建线程,这里的参数ture就用于标识当前想要创建的线程是核心线程。

    然后我们再来看看addWorker(command, true)方法的具体实现逻辑。

    addWork实现逻辑
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    rs为什么可以表示runstate?
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    

    ~CAPACITY的值为:

    11100000 00000000 00000000 00000000
    

    所以后面的29位不论怎么样都会变成0,也就是最后的结果集中只会有高三位用于表示runstate的参数

    什么时候才会创建线程?
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    

    哇~这个判断真的是够绕的,我看了老半天,一起来缕一缕

    rs >= SHUTDOW
    

    代表当前runstate是SHUTDOWN STOP TIDYING TERMINATED 任意一个,关于五种状态的值可见线程池五种状态的二进制表示

    ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
    

    A:在rs >= SHUTDOW成立的前提下,如果是rs != SHUTDOWN,则整个判断成立。

    B:在rs >= SHUTDOW成立的前提下,如果是firstTask != null,则整个判断成立。

    C:在rs >= SHUTDOW成立的前提下,如果是workQueue.isEmpty(),则整个判断成立。

    并且A B C三个是有先后顺序的

    再总结一下就是在第一个判断成立的前提下,第二个判断中,只要有一个不成立就会返回false,线程创建失败。

    转成白话文就是

    A 当线程池状态为STOP TIDYING TERMINATED时,不会创建线程

    B 当线程池为SHUTDOWN时,不允许新建任务

    C 当线程池为SHUTDOWN时,且没有新的任务,此时如果任务队列也经没有任务,同样不会创建线程

    此时再回头看看线程池的五种状态


    接着往下看

                for (;;) {
                    // 第一步,拿到当前的有效线程数
                    int wc = workerCountOf(c);
                    // 如果当前有效线程数已经大于等于允许的最大线程数,不允许创建
                    // 如果正准备创建的线程是core的线程且大于线程池初始化时设定的线程数,不允许创建
                    // 如果正准备创建的线程不是core但大于线程池初始化时设定的最大线程数,不允许创建
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 利用CAS做自增操作,如果成功了,就跳出循环体开始下一步操作,如果失败则重试
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    // 重新读取当前的runstate,如果当前的runstate和循环体中的runstate有变化,则重新去判断是否需要创建线程
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
    
    线程到底是怎么创建的呢?

    经历了层层校验逻辑,我们总算是要准备创建线程了。

        // 标记线程是否启动
    	boolean workerStarted = false;
    	// 标记线程是否被添加
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 获取锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    				// 再检查一下当前线程池的状态
                    // rs < SHUTDOWN 表示线程池的状态为RUNNING就直接创建线程
                    // rs == SHUTDOWN && firstTask == null 当前线程已经处于SHUTDOWN且当前没有新建的任务(也就是为了把任务队列执行完)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果正准备创建的线程已经处于alive状态,则抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 否则放到workers中(包含线程池中所有的工作线程,将新构造的工作线程加入到工作线程集合中)
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
    
    总结
    创建线程的条件:

    当前线程池状态为RUNNING,当前线程数小于核心线程数或当前线程线程数小于最大线程数。其中最大线程数又分为创建线程池时给定的数量以及程序允许的最大值。

    不创建线程的条件:

    当线程池状态不为RUNNING时,不会接受新的任务,此时如果任务队列还有任务,会把这部分处理完。

    第二个判断:当核心线程池中的所有线程都在运行,此时将线程放到任务中

        // 检查当前线程池状态,如果为RUNNING状态,则尝试将任务放到任务队列中	
    	if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 放置成功以后,再次检查当前的线程池状态,如果当前线程池状态非RUNNING,则尝试将刚刚放入的任务从任务队列中移除
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果当前线程数状态为RUNNING,但是workerCount的值又等于0则传入空任务结束此次创建
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    

    第三个判断:什么时候执行拒绝策略?

        else if (!addWorker(command, false))
            reject(command);
    

    当第二个判断不成立,也就是当前线程池状态非RUNNING状态或尝试将任务放到任务队列中失败时,尝试再次创建一个非核心线程,此时线程数需要小于创建线程池时给定的最大值。

    总结

    结合第二个判断和第三个判断,就可以明白为什么当任务队列是无界的时候,最大线程数不会产生作用了。

    工作线程的执行

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread(); // 得到当前线程
        Runnable task = w.firstTask; // 得到Worker中的任务task,也就是用户传入的task
        w.firstTask = null; // 将Worker中的任务置空
        w.unlock(); // allow interrupts。 
        boolean completedAbruptly = true; // 标识当前Worker异常结束,默认是异常结束
        try {
            // 如果worker中的任务不为空,执行执行任务
            // 否则使用getTask获得任务。一直循环,除非得到的任务为空才退出
            while (task != null || (task = getTask()) != null) {
                // 如果拿到了任务,给自己上锁,表示当前Worker已经要开始执行任务了,
                // 已经不是处于闲置Worker(闲置Worker的解释请看下面的线程池关闭)
                w.lock();  
                // 在执行任务之前先做一些处理。 
                // 1. 如果线程池已经处于STOP状态并且当前线程没有被中断,中断线程 
                // 2. 如果线程池还处于RUNNING或SHUTDOWN状态,并且当前线程已经被中断了,
                // 重新检查一下线程池状态,如果处于STOP状态并且没有被中断,那么中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任务执行前需要做什么,ThreadPoolExecutor是个空实现,子类可以自行扩展
                    beforeExecute(wt, task); 
                    Throwable thrown = null;
                    try {
                        // 真正的开始执行任务,这里run的时候可能会被中断,比如线程池调用了shutdownNow方法
                        task.run(); 
                    } catch (RuntimeException x) { // 任务执行发生的异常全部抛出,不在runWorker中处理
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行结束需要做什么,ThreadPoolExecutor是个空实现,子类可以自行扩展
                        afterExecute(task, thrown); 
                    }
                } finally {
                    task = null;
                    w.completedTasks++; // 记录执行任务的个数
                    w.unlock(); // 执行完任务之后,解锁,Worker变成闲置Worker,等待执行下一个任务
                }
            }
            completedAbruptly = false; // 正常结束
        } finally {
            processWorkerExit(w, completedAbruptly); // Worker退出时执行
        }
    }
    

    BlockQueue

    三个添加元素的方法:

    • add:把e加到队列里,添加成功返回true,容量如果满了添加失败会抛出IllegalStateException异常
    • offer:表示如果可能的话,将e加到队列里,成功返回true,否则返回false
    • put:把e加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

    三个删除元素的方法:

    • poll:取走队列头部的对象,若不能立即取出,则可以等待timeout参数规定的时间,取不到时返回null
    • remove:基于对象找到对应的元素并删除,删除成功返回true,否则返回false
    • take:取走队列中排在首位的对象,若队列为空,一直阻塞到队列有元素并删除

    其中:

    队列不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。

    null 被用作指示poll 操作失败的警戒值。

    抛出异常特殊值阻塞超时
    插入add(e)offer(e)put(e)offer(e, time, unit)
    移除remove(Object o)poll()take()poll(time, unit)
    检查element()peek()不可用不可用
    类型含义
    抛出异常如果试图的操作无法立即执行,抛一个异常
    特殊值如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)
    阻塞如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行
    超时如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,
    但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)

    常见的四个实现:

    ArrayBlockingQueue

    一个由数组支持的有界阻塞队列。此队列按 **FIFO(先进先出)**原则对元素进行排序。队列的头部是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。

    ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)

    ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改

    属性

      // 存储队列元素的数组,是个循环数组
      final Object[] items;
    
      // 拿数据的索引,用于take,poll,peek,remove方法
      int takeIndex;
    
      // 放数据的索引,用于put,offer,add方法
      int putIndex;
    
      // 元素个数
      int count;
    
      // 可重入锁
      final ReentrantLock lock;
      // notEmpty条件对象,由lock创建
      private final Condition notEmpty;
      // notFull条件对象,由lock创建
      private final Condition notFull;
    

    创建

    	// 构造函数要求指定队列大小capacity
    	/**
    	* capacity和fair,capacity同第一个构造方法,代表队列大小。fair代表该队列的访问策略是否公平。如果为 		
    	* true,则按照 FIFO 顺序访问插入或移除时受阻塞线程的队列;如果为 false,则访问顺序是不确定的。这里fair参
    	* 数被设置为ReentrantLock的入参,就可以通过ReentrantLock来保证线程访问是否公平。而此构造方法创建了两个
    	* Condition,也就是条件,分别是notEmpty和notFull,Condition可以调用wait()和signal()来控制当前现
    	* 成等待或者唤醒
    	*/
    	// 默认fair为false
    	public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    

    内部调用的this(capacity, fair)方法

        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

    数据的添加

    add

    ArrayBlockingQueue自己并没有实现add方法,而直接调用父类AbstractQueue的add方法

      public boolean add(E e) {
        return super.add(e);
      }
    

    内部调用的super.add

        public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw  IllegalStateException("Queue full");
        }
    

    实际上最后调用的就是ArrayBlockingQueue自己的offer方法,但是如果offer方法返回结果为false,则抛出IllegalStateException

    offer
    	public boolean offer(E e) {
        		// 不允许元素为空,为空会抛出NullPointerException异常
            checkNotNull(e);
        		// 获取锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 元素个数和当前存储队列元素的数组大小相等,就不会再加了,所以会返回false
                if (count == items.length)
                    return false;
                else {
                  	// 入队操作
                    enqueue(e);
                    return true;
                }
            } finally {
              	// 释放锁
                lock.unlock();
            }
        }
    

    内部调用的enqueue方法

    	private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
        		// 放数据索引+1,如果数据索引已经与存储队列的元素数量相同则变为0
            if (++putIndex == items.length)
                putIndex = 0;
        		// 元素个数+1
            count++;
        		// 使用条件对象notEmpty通知,比如使用take方法的时,队列中没有数据被阻塞。这个时候队列中新增了一条数据,需要调用signal通知
            notEmpty.signal();
        }
    
    put
    	public void put(E e) throws InterruptedException {
        		// 元素为空抛出NPE异常
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
        		// 加锁,保证调用put方法的时候只有一个线程
            lock.lockInterruptibly();
            try {
              	// 如果队列满了,阻塞当前线程并加入到条件对象notFull的等待队列里
                while (count == items.length)
                  	// 线程阻塞并被挂起,同时释放锁
                    notFull.await();
              	// 入队
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    

    疑问点,之前的入队操作的写法是

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal(); 
    }
    

    为什么改了?改之前和改之后有什么区别?

    总结

    ArrayBlockingQueue总共有三个添加数据的方法,分别是add、put、offer

    • add方法:内部实际调用的是offer方法,如果队列已满则抛出IllegalStateException一场,否则返回true

    • offer方法:如果队列满了返回false,成功返回true

    • put方法:如果队列满了会阻塞线程,直到有线程消费了队列中的元素

    三个方法内部都使用可重入锁保证原子性

    数据的删除

    poll
        public E poll() {
          	// 加锁,保证调用poll方法的时候只有一个线程
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
              	// 如果count=0,也就是队列中没有元素了,这时候会返回null,否则调用dequeue取元素
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    内部调用的dequeue方法

        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
          	// takeIndex是用于拿数据的索引
            E x = (E) items[takeIndex];
          	// 取出数据以后,这个元素位置为null
            items[takeIndex] = null;
          	// 如果当前拿数据的索引大小已经和元素数组大小相等则置为0,这里的takeIndex之所以自增是因为FIFO原则
            if (++takeIndex == items.length)
                takeIndex = 0;
          	// 一个数据被取出,此时元素个数-1
            count--;
          	// TODO
            if (itrs != null)
                itrs.elementDequeued();
          	// 使用对象notFull通知,如使用put方法放置数据的时候队列满了,被阻塞,这个时候dequeue取出一条数据,队列没满,则可以继续放入
            notFull.signal();
            return x;
        }
    
    take
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
              	// 如果队列为空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
                while (count == 0)
                  	// 线程阻塞并被挂起,同时释放锁
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    remove
        public boolean remove(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                  	// 放数据索引
                    final int putIndex = this.putIndex;
                  	// 取数据索引
                    int i = takeIndex;
                  	// 通过元素类型遍历,找到类型相同的索引位置,i<=元素总大小
                    do {
                        if (o.equals(items[i])) {
                            removeAt(i);
                            return true;
                        }
                        if (++i == items.length)
                            i = 0;
                    	// 因为putIndex就是最后一个放数据的位置,所以拿数据的索引不能等于它
                      // 但是有没有想过为什么不可以写<=呢?
                    } while (i != putIndex);
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    

    内部调用的removeAt方法

        void removeAt(final int removeIndex) {
            // assert lock.getHoldCount() == 1;
            // assert items[removeIndex] != null;
            // assert removeIndex >= 0 && removeIndex < items.length;
            final Object[] items = this.items;
          	// 如果要删除数据的索引位置就是拿数据索引位置,直接将takeIndex索引位置上的数据,然后takeIndex+1
            if (removeIndex == takeIndex) {
                // removing front item; just advance
                items[takeIndex] = null;
                if (++takeIndex == items.length)
                    takeIndex = 0;
                count--;
                if (itrs != null)
                    itrs.elementDequeued();
            } else {
                // an "interior" remove
    
                // slide over all others up through putIndex.
              	// 如果要删除数据的索引位置不是takeIndex,则需要移动元素位置,更新putIndex
                final int putIndex = this.putIndex;
                for (int i = removeIndex;;) {
                    int next = i + 1;
                    if (next == items.length)
                        next = 0;
                    if (next != putIndex) {
                        items[i] = items[next];
                        i = next;
                    } else {
                        items[i] = null;
                        this.putIndex = i;
                        break;
                    }
                }
                count--;
                if (itrs != null)
                    itrs.removedAt(removeIndex);
            }
          	// 删除以后通知阻塞线程,比如put方法
            notFull.signal();
        }
    

    总结

    三个删除方法,分别是poll,take,remove

    • poll方法对于队列为空的情况,返回null,否则返回队列头部元素
    • remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false
    • poll方法和remove方法不会阻塞线程
    • take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中
    • 三个删除方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程

    LinkedBlockingQueue

    [外链图片转存失败(img-TQTAYdEw-1563702945157)(https://raw.githubusercontent.com/baofeidyz/images/master/img/20190721160443.png)]

    内部以一个链式结构(链接节点)对其元素进行存储,链表是单向链表,满足FIFO(先进先出)原则。

    属性

        /** The capacity bound, or Integer.MAX_VALUE if none */
    		// 容量大小
        private final int capacity;
    
        /** Current number of elements */
    		// 元素个数
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
    		// 头节点
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
    		// 尾节点
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
    		// 读锁
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
    		// 读锁的条件对象
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
    		// 写锁
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
    		// 写锁的条件对象
        private final Condition notFull = putLock.newCondition();
    

    ArrayBlockingQueue只有1个锁,添加数据和删除数据的时候只有1个可以被执行,不允许并行操作

    但LinkedBlockingQueue有2个锁,读和写各有一把,添加数据和删除数据两个操作可以并行。

    创建

        public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }
    

    内部调用的this和enqueue方法

        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
          	// last和head节点都是null
            last = head = new Node<E>(null);
        }
    
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
        static class Node<E> {
            E item;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    

    enqueue的class方法

    	private void enqueue(Node<E> paramNode){
          this.last = (this.last.next = paramNode);
        }
    

    另外还画了一个图

    数据的添加

    三个方法,分别是add offer put

    add

    LinkedBlockingQueue和ArrayBlockingQueue一样,同样没有实现add方法,所以会直接调用父类AbstractQueue的add方法

    		public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    

    实际上也就是调用LinkedBlockingQueue的offer方法,失败则抛出异常

    offer

    关于这里使用到的锁实际上涉及到java的monitor MESA模型,这里就不展开讲了,有兴趣的小伙伴戳极客时间-管程:并发编程的万能钥匙,有二十个小伙伴可以免费读哦~打不开的话复制地址用微信打开哦?

    可重入锁指的是线程可以重复获取同一把锁,ReentrantLock有一个带布尔值fair的构造函数,true表示公平锁,反之则是非公平锁。指的是条件变量的等待队列唤醒策略,公平锁是唤醒等待时间最长的,非公平锁则不会保证

        public boolean offer(E e) {
          	// 不允许空元素
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
          	// 如果满了就返回false,but这个大小可是2^31-1=2147483647
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
          	// 拿到写锁
            final ReentrantLock putLock = this.putLock;
          	// 对写操作加锁
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                  	// 元素个数+1
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                      	// 如果容量还没满,在对象notFull唤醒正在等待的线程,表示可以再往队列里加数据了
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
          // todo
            return c >= 0;
        }
    
    put
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
          	// 拿到写锁
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
          	// 对写操作加锁
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
              	// 如果当前容量已经满了,则阻塞并挂起当前线程
                while (count.get() == capacity) {
                    notFull.await();
                }
              	// 入队操作
                enqueue(node);
              	// 元素+1
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                  	// 如果容量还没满,在放锁的条件对象notFull唤醒正在等待的线程
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    

    总结

    add offer put方法与ArrayBlockingQueue特性一致,只是底层实现不同

    • add方法:内部实际调用的是offer方法,如果队列已满则抛出IllegalStateException一场,否则返回true

    • offer方法:如果队列已满则返回false,成功返回true

    • put方法:如果队列满了会阻塞线程,直到有线程消费了队列中的元素

    ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒,而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程

    数据的删除

    poll
        public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                if (count.get() > 0) {
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
          	// 成功就会返回实际的元素,否则返回null
            return x;
        }
    

    内部调用的dequeue方法

        private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            head = first;
            E x = first.item;
            first.item = null;
            return x;
        }
    

    不是特别好理解的话,就看图吧

    [外链图片转存失败(img-4JzcQZkW-1563702945159)(https://raw.githubusercontent.com/baofeidyz/images/master/img/20190721160516.png)]

    take
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                  	// 阻塞等待
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
    remove
        public boolean remove(Object o) {
            if (o == null) return false;
          	// remove操作的位置不固定,所以需要对两个锁都进行加锁
            fullyLock();
            try {
                for (Node<E> trail = head, p = trail.next;
                     p != null;
                     trail = p, p = p.next) {
                  	// 判断是否找到对象
                    if (o.equals(p.item)) {
                      	// 修改节点的链接信息,同时调用notFull的signal方法
                        unlink(p, trail);
                        return true;
                    }
                }
                return false;
            } finally {
                fullyUnlock();
            }
        }
    

    内部调用的fullyLock方法 unlink方法以及fullyUnlock方法

      void fullyLock() {
          putLock.lock();
          takeLock.lock();
      }
      void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
          last = trail;
        // 判断当前元素总数是否等于最大值
        if (count.getAndDecrement() == capacity)
          notFull.signal();
      }
      void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
      }
    

    总结

    take poll方法的特性与ArrayBlockingQueue一致,唯一不同的是remove方法中会同时对取 拿两个锁进行加锁

    ArrayBlockingQueue因为内部实现是通过数组实现,所以其在初始化的时候必须要指定大小,且不可变,在删除操作的时候会移动元素。

    LinkedBlockingQueue内部实现是通过单向链表实现,本身没有边界,但默认最大值为2^31-1。可同时操作读和写,在删除操作时需要给读写同时加锁。

    DelayQueue

    Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队 列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待

    SynchronousQueue

    SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

    拒绝策略

    主要有四种

    ThreadPoolExecutor.AbortPolicy

    默认的ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException

        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    

    ThreadPoolExecutor.CallerRunsPolicy

    线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度

    	public static class CallerRunsPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code CallerRunsPolicy}.
             */
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    

    ThreadPoolExecutor.DiscardPolicy

    不能执行的任务将被删除

        public static class DiscardPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    

    ThreadPoolExecutor.DiscardOldestPolicy

    如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

    	public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
            public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    

    参考资料

    极客时间-Java并发编程实战

    《Java高并发编程详解:多线程与架构设计 (Java核心技术系列) 》

    Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析

    LinkedBlockingQueue源码解析

    展开全文
  • BlockQueue 解析

    2019-04-02 16:23:00
    BlockQueue 解析 生产者、消费者模式 https://www.jianshu.com/p/024a36b83099 posted @ 2019-04-02 16:23 诸葛子房 阅读(...) 评论(...) 编辑 收藏 ...

    BlockQueue 解析

    生产者、消费者模式

    https://www.jianshu.com/p/024a36b83099

    posted @ 2019-04-02 16:23 诸葛子房 阅读( ...) 评论( ...) 编辑 收藏
    展开全文
  • BlockQueue 是什么?

    千次阅读 2019-12-25 21:48:16
    下面是一个简单的BlockQueue的实现: public class BlockingQueue { private List queue = new LinkedList ( ) ; private int limit = 10 ; public BlockingQueue ( int limit ) { ...

    本文内容如有错误、不足之处,欢迎技术爱好者们一同探讨,在本文下面讨论区留言,感谢。

    简述

    阻塞队列:当线程队列是空时,从队列中获取元素的操作将会被阻塞;当线程队列是满时,往队列里添加元素的操作将会被阻塞。

    Java 5 开始出现存在 java.util.concurrent 包下,阻塞队列是一个队列,当尝试从队列中出队并且队列为空时,或者尝试将项目入队并且队列已满时,它将阻塞。尝试从空队列中出队的线程被阻止,直到其他线程将一个项目插入队列中为止。尝试使一个项目进入完整队列的线程被阻塞,直到某个其他线程在队列中腾出空间为止,方法是使一个或多个项目出队或完全清除队列。

    原理

    原理示意图片:
    图片地址:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

    在这里插入图片描述
    下面是一个简单的BlockQueue的实现:

    public class BlockingQueue {
      private List queue = new LinkedList();
      private int  limit = 10;
      public BlockingQueue(int limit){
        this.limit = limit;
      }
      
      public synchronized void enqueue(Object item)throws InterruptedException  {
        while(this.queue.size() == this.limit) {
          wait();
        }
        if(this.queue.size() == 0) {
          notifyAll();
        }
        this.queue.add(item);
      }
      
      public synchronized Object dequeue() throws InterruptedException{
        while(this.queue.size() == 0){
          wait();
        }
        if(this.queue.size() == this.limit){
          notifyAll();
        }
        return this.queue.remove(0);
      }
    }
    

    操作代码:

    // 调用 BlockingQueue 方法的Java程序演示
      
    import java.util.concurrent.*; 
    import java.util.*; 
      
    public class GFG { 
        public static void main(String[] args) 
            throws InterruptedException 
        { 
      
            // ArrayBlockingQueue 的边界大小
            int capacity = 5; 
      
            // 创建 ArrayBlockingQueue 
            ArrayBlockingQueue<String>  queue = new ArrayBlockingQueue<String>(capacity); 
      
            // 使用put()方法添加元素 
            queue.put("StarWars"); 
            queue.put("SuperMan"); 
            queue.put("Flash"); 
            queue.put("BatMan"); 
            queue.put("Avengers"); 
      
            // 打印队列
            System.out.println("queue contains "+ queue); 
      
            // 移除一些元素
            queue.remove(); 
            queue.remove(); 
    		queue.put("CaptainAmerica"); 
            queue.put("Thor"); 
      
            System.out.println("queue contains " + queue); 
        } 
    } 
    

    输出结果:

    queue contains [StarWars, SuperMan, Flash, BatMan, Avengers]
    queue contains [Flash, BatMan, Avengers, CaptainAmerica, Thor]
    

    操作

    BlockingQueue方法有四种形式,它们以不同的方式处理操作,这些操作可能无法满足开发需求,但将来可能会满足:第一种抛出异常,第二种返回特殊值( null或false,具体取决于操作),第三个块将无限期地阻塞当前线程,直到操作成功为止;第四个块仅在给定的最大时间限制内超时。

    下表总结了这些方法:

    方法类型抛出异常特殊值阻塞超时
    插入add(e)offer(e)put(e)offer(e,time,unit)
    移除remove()poll()take()poll(time,unit)
    检查element()peek()

    这4种不同的行为集意味着:

    • 引发异常:如果无法立即进行尝试的操作,则会引发异常。
    • 特殊值:如果无法立即尝试操作,则会返回一个特殊值(通常为true / false)。
    • 阻塞:如果无法立即进行尝试的操作,则该方法调用将一直阻塞直到可行为止。
    • 超时:如果无法立即进行尝试的操作,则该方法调用将一直阻塞直到成功,但等待时间不得长于给定的超时。返回一个特殊值,告诉操作是否成功(通常为true / false)。

    下面是一个生产者消费者的案例使用BlockQueue:

    class Producer implements Runnable {
       private final BlockingQueue queue;
       Producer(BlockingQueue q) { queue = q; }
       public void run() {
         try {
           while (true) { queue.put(produce()); }
         } catch (InterruptedException ex) { ... handle ...}
       }
       Object produce() { ... }
     }
    
     class Consumer implements Runnable {
       private final BlockingQueue queue;
       Consumer(BlockingQueue q) { queue = q; }
       public void run() {
         try {
           while (true) { consume(queue.take()); }
         } catch (InterruptedException ex) { ... handle ...}
       }
       void consume(Object x) { ... }
     }
    
     class Setup {
       void main() {
         BlockingQueue q = new SomeQueueImplementation();
         Producer p = new Producer(q);
         Consumer c1 = new Consumer(q);
         Consumer c2 = new Consumer(q);
         new Thread(p).start();
         new Thread(c1).start();
         new Thread(c2).start();
       }
     }
    

    参考资料

    Blocking Queues

    Interface BlockingQueue

    Java BlockingQueue

    BlockingQueue Interface in Java

    展开全文
  • BlockQueue的使用

    千次阅读 2016-12-06 15:03:30
     put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断  直到BlockingQueue里面有空间再继续.  获取数据:  poll(time):取走BlockingQueue里排在首位的对象,...
  • public boolean add(E e) 方法将抛出IllegalStateException异常,说明队列已满。 public boolean offer(E e) 方法则不会抛异常,只会返回boolean值,告诉你添加成功与否,队列已满,当然返回false。...
  • BlockQueue

    2018-01-15 14:51:30
    首先BlockQueue是线程安全的,其内部有一个ReentrantLock,由ReentrantLock产生两个Condition,其中一个Condition是调用take方法阻塞的线程集合,另外一个是调用put方法阻塞线程的集合,我们知道ReentrantLock是基于...
  • 引言 之前自己的面试经历老被问到手写阻塞队列,当然大概率情况下面试官不会很直白的就让你实现一个阻塞队列,这个问题有很多的变种,但是万变不离其宗,知道了怎么去实现阻塞,也就会实现阻塞队列了。...
  • Java中对BlockQueue的理解

    千次阅读 2017-11-10 16:52:42
    在学习多线程的知识时,意外碰见了BlockQueue这个类,从字面理解是个队列块,带着好奇,查看了官方文档 文档如下: BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来...
  • 阻塞队列BlockQueue

    2019-11-09 08:41:40
    先上BlockQueue的源码: public interface BlockingQueue<E> extends Queue<E> { //增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常 boolean add(E e); //添加一个元素并返回...
  • 手写blockqueue队列

    2019-12-03 15:30:18
    package ... import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BlockQueue<...
  • 实战Concurrent-BlockQueue

    2019-03-21 02:04:53
    NULL 博文链接:https://cfeng-feng.iteye.com/blog/963451
  • BlockQueue练习

    2014-11-11 15:48:04
    BlockQueue练习代码,基本实现。小Demo
  • BlockQueue 1. BlockingQueue 简介 1.1 队列类型 1.2 队列数据结构 1.3 基本操作 2. 常用的 BlockingQueue 2.1 ArrayBlockingQueue 2.2 LinkedBlockingQueue 2.3 DelayQueue 2.4 PriorityBlockingQueue 1. ...
  • blockQueue 作为线程容器、阻塞队列,多用于生产者、消费者的关系模式中,保障并发编程线程同步,线程池中被用于当作存储任务的队列,还可以保证线程执行的有序性。 一、常用方法 1.生产 add(obj):往队列里面增加...
  • 在学习完java的同步队列、Lock和等待通知机制之后,再来看阻塞队列会觉得阻塞队列更加容易理解。阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入:当队列满时,队列会阻塞插入元素的线程,...
  •  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断 直到BlockingQueue里面有空间再继续.  获取数据:  poll(time):取走BlockingQueue里排在首位的对象,若不能立即...
  • import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;...
  • 常见阻塞队列包含ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等, 阻塞队列常用方法 add offer put take remove poll element peek 对比...
  • 阻塞队列BlockQueue的一些方法

    千次阅读 2014-07-06 17:49:55
    文章来源:http://blog.csdn.net/shixing_11/article/details/7109471
  • * 1: 线程安全 阻塞队列 BlockQueue * 1-1 同一时刻,出队或者入队 只能有一个线程在操作,这样保证了 出对和 入队的线程安全性 * 1-2 但是同一时刻,可以有 分别执行出队和人对的操作的两个线程在操作, * 但是为了...
  • 二、BlockQueue  功能介绍:阻塞队列(主要用于生产者消费者模型)  案例使用:   package com.gpdi.security; import java.util.concurrent.ArrayBlockingQueue; /** * description:生产者...
  • Java BlockQueue

    千次阅读 2016-05-29 22:02:00
    基本原理特殊的队列:BlockingQueue如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被...
  • package testFuture; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { public static void main(String[] args) { ...
  • package ... import ch.qos.logback.core.util.TimeUtil; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util...
  • BlockQueue队列<转>

    2016-12-30 10:57:33
     put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断  直到BlockingQueue里面有空间再继续. 获取数据:  poll(time):取走BlockingQueue里排在首位的对象,若不...
  •  最近在研究blockqueue的源码,从今天开始,和大家分享一下我看源码的一些心得体会  (1)LinkedBlockingQueue源码解析  (2)ArrayBlockingQueue源码解析    LinkedBlockingQueue实现了BlockingQueue接口...
  • volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用 */ public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) throws Exception{ MyResource myResource = new ...
  • 开篇先解释一下队列: 数据结构分为线性数据和非先性数据 这里的线性数据结构指的是内存地址的线性存储,而下面要说的数组实现的队列就是线程地址存储结构中的一种,我们往下看 队列为先进先出的数据结构(FIFO)...
  • 数据 package cn.lonecloud.procum; /** * @author lonecloud * @version v1.0 * @date 上午11:00 2018/5/7 */ public class Data { private String data; public String getData() { ......

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,480
精华内容 1,392
关键字:

blockqueue