精华内容
下载资源
问答
  • java线程池实现原理
    2018-04-18 14:55:09

    出处:https://blog.csdn.net/hzw19920329/article/details/52372348

    问题一:线程池存在哪些状态,这些状态之间是如何进行切换的呢?

    查看ThreadPoolExecutor源码便知晓:

    [java]  view plain  copy
    1. // runState is stored in the high-order bits  
    2.    private static final int RUNNING    = -1 << COUNT_BITS;  
    3.    private static final int SHUTDOWN   =  0 << COUNT_BITS;  
    4.    private static final int STOP       =  1 << COUNT_BITS;  
    5.    private static final int TIDYING    =  2 << COUNT_BITS;  
    6.    private static final int TERMINATED =  3 << COUNT_BITS;  
            存在5种状态:

            <1>Running:可以接受新任务,同时也可以处理阻塞队列里面的任务;

            <2>Shutdown:不可以接受新任务,但是可以处理阻塞队列里面的任务;

            <3>Stop:不可以接受新任务,也不处理阻塞队列里面的任务,同时还中断正在处理的任务;

            <4>Tidying:属于过渡阶段,在这个阶段表示所有的任务已经执行结束了,当前线程池中是不存在有效的线程的,并且将要调用terminated方法;

            <5>Terminated:终止状态,这个状态是在调用完terminated方法之后所处的状态;

            那么这5种状态之间是如何进行转换的呢?查看ThreadPoolExecutor源码里面的注释便可以知道啦:

    [java]  view plain  copy
    1. * RUNNING -> SHUTDOWN  
    2.      *    On invocation of shutdown(), perhaps implicitly in finalize()  
    3.      * (RUNNING or SHUTDOWN) -> STOP  
    4.      *    On invocation of shutdownNow()  
    5.      * SHUTDOWN -> TIDYING  
    6.      *    When both queue and pool are empty  
    7.      * STOP -> TIDYING  
    8.      *    When pool is empty  
    9.      * TIDYING -> TERMINATED  
    10.      *    When the terminated() hook method has completed  

    从上面可以看到,在调用shutdown方法的时候,线程池状态会从Running转换成Shutdown;在调用shutdownNow方法的时候,线程池状态会从Running/Shutdown转换成Stop;在阻塞队列为空同时线程池为空的情况下,线程池状态会从Shutdown转换成Tidying;在线程池为空的情况下,线程池状态会从Stop转换成Tidying;当调用terminated方法之后,线程池状态会从Tidying转换成Terminate;

    问题二:线程池的种类有哪些?

     (1):CachedThreadPool:缓存线程池,该类线程池中线程的数量是不确定的,理论上可以达到Integer.MAX_VALUE个,这种线程池中的线程都是非核心线程,既然是非核心线程,那么就存在超时淘汰机制了,当里面的某个线程空闲时间超过了设定的超时时间的话,就会回收掉该线程;

    (2):FixedThreadPool:固定线程池,这类线程池中是只存在核心线程的,对于核心线程来说,如果我们不设置allowCoreThreadTimeOut属性的话是不存在超时淘汰机制的,这类线程池中的corePoolSize的大小是等于maximumPoolSize大小的,也就是说,如果线程池中的线程都处于活动状态的话,如果有新任务到来,他是不会开辟新的工作线程来处理这些任务的,只能将这些任务放到阻塞队列里面进行等到,直到有核心线程空闲为止;

            (3):ScheduledThreadPool:任务线程池,这种线程池中核心线程的数量是固定的,而对于非核心线程的数量是不限制的,同时对于非核心线程是存在超时淘汰机制的,主要适用于执行定时任务或者周期性任务的场景;

            (4):SingleThreadPool:单一线程池,线程池里面只有一个线程,同时也不存在非核心线程,感觉像是FixedThreadPool的特殊版本,他主要用于确保任务在同一线程中的顺序执行,有点类似于进行同步吧;

    问题三:创建线程池需要哪些参数,这些参数的具体含义是什么?

    同样查看ThreadPoolExecutor源码,查看创建线程池的构造函数:

    [java] 
    1. public ThreadPoolExecutor(int corePoolSize,  
    2.                               int maximumPoolSize,  
    3.                               long keepAliveTime,  
    4.                               TimeUnit unit,  
    5.                               BlockingQueue<Runnable> workQueue,  
    6.                               ThreadFactory threadFactory,  
    7.                               RejectedExecutionHandler handler)  
            不管你调用的是ThreadPoolExecutor的哪个构造函数,最终都会执行到这个构造函数的,这个构造函数有7个参数,正是由于对这7个参数值的赋值不同,造成生成不同类型的线程池,比如我们常见的CachedThreadPoolExecutor、FixedThreadPoolExecutor

    SingleThreadPoolExecutor、ScheduledThreadPoolExecutor,我们老看看这几个参数的具体含义:

            <1>corePoolSize:线程池中核心线程的数量;当提交一个任务到线程池的时候,线程池会创建一个线程来执行执行任务,即使有其他空闲的线程存在,直到线程数达到corePoolSize时不再创建,这时候会把提交的新任务放入到阻塞队列中,如果调用了线程池的preStartAllCoreThreads方法,则会在创建线程池的时候初始化出来核心线程;

            <2>maximumPoolSize:线程池允许创建的最大线程数;如果阻塞队列已经满了,同时已经创建的线程数小于最大线程数的话,那么会创建新的线程来处理阻塞队列中的任务;

            <3>keepAliveTime:线程活动保持时间,指的是工作线程空闲之后继续存活的时间,默认情况下,这个参数只有线程数大于corePoolSize的时候才会起作用,即当线程池中的线程数目大于corePoolSize的时候,如果某一个线程的空闲时间达到keepAliveTime,那么这个线程是会被终止的,直到线程池中的线程数目不大于corePoolSize;如果调用allowCoreThreadTimeOut的话,在线程池中线程数量不大于corePoolSize的时候,keepAliveTime参数也可以起作用的,知道线程数目为0为止;

            <4>unit:参数keepAliveTime的时间单位;

          <5>workQueue:阻塞队列;用于存储等待执行的任务,有四种阻塞队列类型,ArrayBlockingQueue(基于数组的有界阻塞队列)、LinkedBlockingQueue(基于链表结构的阻塞队列)、SynchronousQueue(不存储元素的阻塞队列)、PriorityBlockingQueue(具有优先级的阻塞队列);

            <6>threadFactory:用于创建线程的线程工厂;

            <7>handler:当阻塞队列满了,且没有空闲线程的情况下,也就是说这个时候,线程池中的线程数目已经达到了最大线程数量,处于饱和状态,那么必须采取一种策略来处理新提交的任务,我们可以自己定义处理策略,也可以使用系统已经提供给我们的策略,先来看看系统为我们提供的4种策略,AbortPolicy(直接抛出异常)、CallerRunsPolicy(只有调用者所在的线程来运行任务)、DiscardOldestPolicy(丢弃阻塞队列中最近的一个任务,并执行当前任务)、Discard(直接丢弃);

    问题四:将任务添加到线程池之后运行流程?

    我们可以调用submit或者execute方法,两者最大的区别在于,调用submit方法的话,我们可以传入一个实现Callable接口的对象,进而能在当前任务执行结束之后通过Future对象获得任务的返回值,submit内部实际上还是执行的execute方法;而调用execute方法的话,是不能获得任务执行结束之后的返回值的;此外,调用submit方法的话是可以抛出异常的,但是调用execute方法的话,异常在其内部得到了消化,也就是说异常在其内部得到了处理,不会向外传递的;

            因为submit方法最终也是会执行execute方法的,因此我们只需要了解execute方法就可以了:

            在execute方法内部会分三种情况来进行处理:

            <1>:首先判断当前线程池中的线程数量是否小于corePoolSize,如果小于的话,则直接通过addWorker方法创建一个新的Worker对象来执行我们当前的任务;

            <2>:如果说当前线程池中的线程数量大于corePoolSize的话,那么会尝试将当前任务添加到阻塞队列中,然后第二次检查线程池的状态,如果线程池不在Running状态的话,会将刚刚添加到阻塞队列中的任务移出,同时拒绝当前任务请求;如果第二次检查发现当前线程池处于Running状态的话,那么会查看当前线程池中的工作线程数量是否为0,如果为0的话,就会通过addWorker方法创建一个Worker对象出来处理阻塞队列中的任务;

            <3>:如果原先线程池就不处于Running状态或者我们刚刚将当前任务添加到阻塞队列的时候出现错误的话,那么会去尝试通过addWorker创建新的Worker来处理当前任务,如果添加失败的话,则拒绝当前任务请求;

            可以看到在上面的execute方法中,我们仅仅只是检查了当前线程池中的线程数量有没有超过corePoolSize的情况,那么当前线程池中的线程数量有没有超过maximumPoolSize是在哪里检测的呢?实际上是在addWorker方法里面了,我们可以看下addWorker里面的一段代码:

    [java]  view plain  copy
    1. if (wc >= CAPACITY ||  
    2.                     wc >= (core ? corePoolSize : maximumPoolSize))  
    3.                     return false;  
            如果当前线程数量超过maximumPoolSize的话,直接就会调用return方法,返回false;

            其实到这里我们很明显可以知道,一个线程池中线程的数量实际上就是这个线程池中Worker的数量,如果Worker的大小超过了corePoolSize,那么任务都在阻塞队列里面了,Worker是Java对我们任务的一个封装类,他的声明是酱紫的:

    [java]  view plain  copy
    1. private final class Worker  
    2.        extends AbstractQueuedSynchronizer  
    3.        implements Runnable  
            可以看到他实现了Runnable接口,他是在addWorker方法里面通过new Worker(firstTask)创建的,我们来看看他的构造函数就知道了:
    [java]  view plain  copy
    1. Worker(Runnable firstTask) {  
    2.             setState(-1); // inhibit interrupts until runWorker  
    3.             this.firstTask = firstTask;  
    4.             this.thread = getThreadFactory().newThread(this);  
    5.         }  
            而这里的firstTask其实就是我们调用execute或者submit的时候传入的那个参数罢了,一般来说这些参数是实现Callable或者Runnable接口的;

            在通过addWorker方法创建出来Worker对象之后,这个方法的最后会执行Worker内部thread属性的start方法,而这个thread属性实际上就是封装了Worker的Thread,执行他的start方法实际上执行的是Worker的run方法,因为Worker是实现了Runnable接口的,在run方法里面就会执行runWorker方法,而runWorker方法里面首先会判断当前我们传入的任务是否为空,不为空的话直接就会执行我们通过execute或者submit方法提交的任务啦,注意一点就是我们虽然会通过submit方法提交实现了Callable接口的对象,但是在调用submit方法的时候,其实是会将Callable对象封装成实现了Runnable接口对象的,不信我们看看submit方法源码是怎么实现的:

    [java]  view plain  copy
    1. public <T> Future<T> submit(Callable<T> task) {  
    2.     if (task == nullthrow new NullPointerException();  
    3.     RunnableFuture<T> ftask = newTaskFor(task);  
    4.     execute(ftask);  
    5.     return ftask;  
    6. }  
    看到没有呢,实际上在你传入实现了Callable接口对象的时候,在submit方法里面是会将其封装成RunnableFuture对象的,而RunnableFuture接口是继承了Runnable接口的;那么说白了其实就是直接执行我们提交任务的run方法了;如果为空的话,则会通过getTask方法从阻塞队列里面拿出一个任务去执行;在任务执行结束之后继续从阻塞队列里面拿任务,直到getTask的返回值为空则退出runWorker内部循环,那么什么情况下getTask返回为空呢?查看getTask方法的源码注释可以知道:在Worker必须需要退出的情况下getTask会返回空,具体什么情况下Worker会退出呢?(1):当Worker的数量超过maximumPoolSize的时候;(2):当线程池状态为Stop的时候;(3):当线程池状态为Shutdown并且阻塞队列为空的时候;(4):使用等待超时时间从阻塞队列中拿数据,但是超时之后仍然没有拿到数据;

            如果runWorker方法退出了它里面的循环,那么就说明当前阻塞队列里面是没有任务可以执行的了,你可以看到在runWorker方法内部的finally语句块中执行了processWorkerExit方法,用来对Worker对象进行回收操作,这个方法会传入一个参数表示需要删除的Worker对象;在进行Worker回收的时候会调用tryTerminate方法来尝试关闭线程池,在tryTerminate方法里面会检查是否有Worker在工作,检查线程池的状态,没问题的话就会将当前线程池的状态过渡到Tidying,之后调用terminated方法,将线程池状态更新到Terminated;

    从上面的分析中,我们可以看出线程池运行的4个阶段:

            (1):poolSize < corePoolSize,则直接创建新的线程(核心线程)来执行当前提交的任务;

            (2):poolSize = corePoolSize,并且此时阻塞队列没有满,那么会将当前任务添加到阻塞队列中,如果此时存在工作线程(非核心线程)的话,那么会由工作线程来处理该阻塞队列中的任务,如果此时工作线程数量为0的话,那么会创建一个工作线程(非核心线程)出来;

            (3):poolSize = corePoolSize,并且此时阻塞队列已经满了,那么会直接创建新的工作线程(非核心线程)来处理阻塞队列中的任务;

            (4):poolSize = maximumPoolSize,并且此时阻塞队列也满了的话,那么会触发拒绝机制,具体决绝策略采用的是什么就要看我们创建ThreadPoolExecutor的时候传入的RejectExecutionHandler参数了;

    问题五:线程池是怎么做到重用线程的呢?

            个人认为线程池里面重用线程的工作是在getTask里面实现的,在getTask里面是存在两个for死循环嵌套的,他会不断的从阻塞对列里面取出需要执行的任务,返回给我们的runWorker方法里面,而在runWorker方法里面只要getTask返回的任务不是空就会执行该任务的run方法来处理它,这样一直执行下去,直到getTask返回空为止,此时的情况就是阻塞队列里面没有任务了,这样一个线程处理完一个任务之后接着再处理阻塞队列中的另一个任务,当然在线程池中的不同线程是可以并发处理阻塞队列中的任务的,最后在阻塞队列内部不存在任务的时候会去判断是否需要回收Worker对象,其实Worker对象的个数就是线程池中线程的个数,至于什么情况才需要回收,上面已经说了,就是四种情况了;

    问题六:线程池是怎样被关闭的呢?

            涉及到线程池的关闭,需要用到两个方法,shutdown和shutdownNow,他们都是位于ThreadPoolExecutor里面的,对于shutdown的话,他会将线程池状态切换成Shutdown,此时是不会影响对阻塞队列中任务执行的,但是会拒绝执行新加进来的任务,同时会回收闲置的Worker;而shutdownNow方法会将线程池状态切换成Stop,此时既不会再去处理阻塞队列里面的任务,也不会去处理新加进来的任务,同时会回收所有Worker










    更多相关内容
  • Java线程池实现原理详解

    千次阅读 2022-04-10 22:00:08
    这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 提高响应速度:当任务到达时,任务可以不需要等待创建线程就能立即执行...

    热衷学习,热衷生活!😄

    沉淀、分享、成长,让自己和他人都能有所收获!😄

    一、为什么要使用线程池?

    线程池提供了一种限制和管理资源(线程、任务)的方式。

    这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

    • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度:当任务到达时,任务可以不需要等待创建线程就能立即执行。
    • 提高线程的可管理性:线程是稀缺资源,如果无线的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    二、ThreadPoolExecutor类分析

    Java线程池主要由Executor框架实现,Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

    线程池实现类ThreadPoolExecutorExecutor框架最核心的类,我们就从这个类的学习线程池的实现原理。

    核心属性

    public class ThreadPoolExecutor extends AbstractExecutorService {
        
        // 控制变量-存放状态和线程数 32位, 高3位存放状态, 低29位存放线程数
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        
        // 任务队列, 必须是阻塞队列
        private final BlockingQueue<Runnable> workQueue;
        
        // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
        private final HashSet<Worker> workers = new HashSet<>();
        
        // 全局锁
        private final ReentrantLock mainLock = new ReentrantLock();
        
        // awaitTermination方法使用的等待条件变量
        private final Condition termination = mainLock.newCondition();
        
        // 记录峰值线程数
        private int largestPoolSize;
        
        // 记录完成成功执行的任务数
        private long completedTaskCount;
        
        // 线程工厂, 用于创建新的线程实例
        private volatile ThreadFactory threadFactory;
        
        // 拒绝执行处理器, 对应不同的拒绝策略
        private volatile RejectedExecutionHandler handler;
        
        // 空闲线程等待任务的时间周期, 单位是纳秒
        private volatile long keepAliveTime;
        
        // 是否允许核心线程超时, 如果为true则keepAliveTime对核心线程也生效
        private volatile boolean allowCoreThreadTimeOut;
        
        // 核心线程数
        private volatile int corePoolSize;
        
        // 线程池容量
        private volatile int maximumPoolSize;
     	
        // 省略其他代码
    }
    

    构造方法

    参数最多的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if(corePoolSize < 0 ||
           maximumPoolSize <= 0 ||
           maximumPoolSize < corePoolSize ||
           keepAliveTime < 0) 
            throw new IllegalArgumentException();
        if(workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    可以根据这个构造方法自定义线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。

    在《阿里巴巴Java开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池创建,不允许在应用中自行显示创建线程,这是因为使用线程池创建线程可以减少在创建和销毁线程上所消耗的时间以及系统资源的开销。

    《阿里巴巴Java开发手册》中还强制不能使用Executors去创建线程池,而是通过上面的ThreadPoolExecutor的构造方式创建,这样的处理方式可以让写的同学更加明确线程池的运行规则,避免资源耗尽的风险。

    下面简单分析一下每个参数的含义和作用:

    • corePoolSize:核心线程数量。
    • maximumPoolSize:最大线程数量,也就是线程池的容量。
    • keepAliveTime:线程空闲等待时间,也和工作线程的生命周期有关。
    • unit:线程空闲时间的单位,最终会转为成纳秒。
    • workQueue:等待队列或者叫任务队列。
    • ThreadFactory:创建线程的工厂,默认使用Executors.defaultThreadFactory()作为线程池工厂实例。
    • handler:线程池的执行执行处理器,更多的时候成为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包含核心线程和非核心线程)并且继续提交任务。提供了4种拒绝策略实现:
      • AbortPolicy:直接拒绝策略,也就是不会执行任务,直接抛出RjectedExecutionExcetion错误,默认的拒绝策略。
      • DiscardPolicy:抛弃策略,也就是直接忽略提交的任务。
      • DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列头的任务抛弃,然后执行当前提交的任务。
      • CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable#run()一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化成同步调用。

    状态控制

    状态控制主要围绕原子整数成员变量crl

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // 通过ctl值获取运行状态
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    
    // 通过ctl值获取工作线程数
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    
    // 通过运行状态和工作线程数计算ctl的值,或运算
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    // CAS操作线程数增加1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    
    // CAS操作线程数减少1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    
    // 线程数直接减少1
    private void decrementWorkerCount() {
        ctl.addAndGet(-1);
    }
    

    接下来分析一下线程池的状态变量,工作线程数量位的长度是COUNT_BITS,它的值是Integer.SIZE - 3,也就是正整数29:

    我们知道,整数包装类型Integer实例的大小是4byte,一共是32位,也就是一共有32位用于存放0和1。

    在ThreadPoolExecutor实现中,使用32位的整数包装类型存放工作线程数和线程状态,其中低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有23种,工作线程上限数量为229 - 1,超过5亿,这个数量在短时间内不用考虑会超限。

    接着看工作线程上线数量掩码COUNT_MASK,它的值是(1 < COUNT_BITS - 1),也就是1左移29位,再减去1,如果补全32位,它的位示图如下:

    然后就是线程池的状态常量,比如RUNNING状态:

    // -1的补码为:111-11111111111111111111111111111
    // 左移29位:  111-00000000000000000000000000000
    // 10进制为:-536870912 
    // 高3位111的值就是表示线程池正在处于运行状态
    private final static int RUNNING = -1 << COUNT_BITS;
    

    线程池状态的运行状态常量:

    状态名称位图十进制值描述
    RUNNING111-0000...-536870912运行中状态,可以接受新的任务和执行任务队列中的任务。
    SHUTDOWN000-0000...0关闭状态,不再接收新的任务,但是会执行任务队列中的任务。
    STOP001-0000...536870912停止状态,不再接受新的任务,也不会执行任务队列中的任务,中断所有执行中的任务。
    TIDYING010-0000...1073741824整理中状态,所有任务都已经执行完毕,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated()
    TERMINATED011-0000...1610612736终止状态,钩子方法terminated()执行完毕。

    这里还有一个比较特殊的技巧,由于运行状态值存放在高3位,所以直接可以通过十进制来比较判断线程池的状态:

    RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

    下面的3个方法就是使用这种技巧:

    // ctl和状态常量比较,判断是否小于
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    
    // ctl和状态常量比较,判断是否小于或等于
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    
    // ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    

    线程状态流转关系如下图:

    execute方法源码分析

    线程异步执行任务的方法实现是ThreadPoolExecutor#execute()方法,我们从源码的实现来学习,源码如下:

    public void execute(Runnable command) {
        // 判断任务对象非空
        if (command == null)
            throw new NullPointerExcetion();
        // 获取ctl值, 用于获取线程池状态、线程池线程数量
        int c = ctl.get();
        // 如果当前线程数小于核心线程数,则创建新的核心线程数并且执行传入的任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWoker(command, true))
                // 如果创建新的核心线程成功则直接返回
                return;
            // 这里说明创建新的核心线程失败,则更新ctl的临时变量c
            c = ctl.get();
        }
        // 走到这里说明创建核心线程失败,也就是当前工作的线程数大于等于核心线程数
        // 判断线程是否处于运行中状态,如果是运行状态尝试使用非阻塞方法向任务队列放入任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 这里向任务队列投放成功,对线程池的运行中状态做二次检查
            // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务,调用拒绝策略处理
            if (!isRunning() && remove(command))
                // 调用拒绝策略处理任务
                reject(command);
            // 走到这里说明有以下的前提:
            // 1、待执行的任务已经成功加入任务队列
            // 2、线程池状态可能是RUNNING
            // 3、传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了)
            // 如果当前工作线程数量为0,则创建一个非核心线程并且传入的对象为null
            // 也就是创建非核心线程不会马上运行,而是等待获取任务队列的任务再执行
            else if (workerCountOf(recheck) == 0)
                // 创建一个非核心线程并且传入的任务对象为null
                addWorker(null, false);
        }
        // 走到这里说明以下:
        // 1、线程池的工作线程总数已经大于等于核心线程数
        // 2、线程池可能不是RUNNING状态
        // 3、线程池可能是RUNNING状态同时任务队列已经满了
        // 如果任务队列投放任务失败,则会尝试创建非核心线程执行任务
        else if (!addWorker(command, false))
            // 如果创建非核心线程失败,执行拒绝策略
            reject(command);
    }
    

    上面代码的流程如下:

    1. 如果当前工作线程总数小于核心线程数corePoolSize,则直接创建核心线程去执行任务(任务实例会传入直接用于构造工作线程实例)。
    2. 如果当前工作线程总是大于等于核心线程数corePoolSize,判断线程状态是否是运行中状态,如果是运行中状态则会尝试用非阻塞方法(offer())向任务队列投放任务,如果投放成功会二次检查线程池运行状态,如果线程池是非运行中状态或者从任务队列移除当前的任务失败,则会调用拒绝策略,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null
    3. 如果任务队列投放任务失败了(任务队列满了),则会创建创建非核心线程传入任务实例执行。
    4. 如果非核心线程创建失败,则会调用拒绝策略。

    这里有一个疑惑点:为什么要二次检查线程池的状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API的解释:

    如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程,因为所有存活的工作线程有可能在最后一次检查之后就终结了或者执行当前任务的时候线程池是否已经shutdown了,所以我们需要二次检车线程池的状态,必须时要把任务从任务队列中移除或者在没有可用的工作线程的前提的下创建一个工作线程。

    execute()方法执行流程图如下:

    addWorker方法源码分析

    addWorker()方法用于添加工作线程,源码如下:

    pirvate boolean addWorker(Runnable firstTask, boolean core) {
        retry;
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            
            // 判断边界情况表,当线程池的状态是shutdown状态下,不会再接受新的任务,
            // 在此前提下如果状态已经到stop状态、或者传入任务不为空、或者任务队列为空
            // 都不需要添加新的任务
            if (rs >= SHUTDOWN &&
               !(rs == SHUTDOWN &&
                 firstTask == null &&
                 ! workQueue.isEmpty()))
                return false;
            
            for (;;) {
                // 获取工作线程总数
                int wc = workCountOf(c);
                // 如果工作线程总数大于等于容量或者大于等于核心线程数/最大线程数,
                // 则不需要添加新的任务
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 成功通过cas添加新的线程数wc,则break到最外层的循环
                if (compareAndOIncreamentWorkerCount(c))
                    breack retry;
                // 走到这里说明通过cas添加新的线程数wc失败,这个时候需要重新判断线程池的状态
                // 是否由RUNNABLE已经变成SHUTDOWN
                c = ctl.get();
                // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
                if (runStateOf(c) != rs)
                    continue retry;
                // 如果线程池状态依然是RUNNING, CAS更新工作线程数wc失败说明有可能是并发更新导致的失败
                // 则在内层循环即可
            }
        }
        
        // 标记工作线程是否成功启动
        boolean workerStarted = false;
        // 标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 传入任务实例firstTask创建worker实例,Worker构造里面会通过线程工厂创建新的Thread对象
            w = new Worker(firstTask);
            // 获取worker的线程
            final Thread t = w.thread;
            if (t != null) {
                // 获取全局锁
                final ReentrantLock mainLock = this.mainLock;
                // 全局锁加锁,因为会改变一些指标值和非线程安全的集合
                mainLock.lock();
                try {
                    // 获取线程池状态
                    int rs = runStateOf(ctl.get());
                    
                    // 如果线程池状态不是RUNNING或者是SHUTDOWN同时传入的任务实例firstTask为null,
                    // 则判断线程是否存活,不存活抛异常
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 把创建的工作线程实例添加到工作线程集合
                        workers.add(w);
                        // 获取工作线程数量
                        int s = workers.size();
                        //  尝试更新线程池峰值容量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 标记工作线程添加成功,后面才会调用Thread#start()方法启动真实的线程实例
                        workerAdded = true;
                    } 
                } finally {
                    mainLock.unLock();
                }
                // 如果成功添加工作线程
                if (workerAdded) {
                    // 调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
                    t.start();
                    // 标记线程启动成功
                    workerStarted = true;
                }
            }
        } finally {
             // 线程启动失败,则需要从工作线程集合移除对应的Worker
             if (! workerStarted)
              	addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    // 添加Worker失败
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock();
        mainLock.lock();
        try {
            if (w != null)
                // 从工作线程移除
                workers.remove();
            // 工作线程数-1
            decrementWorkerCount();
            // 基于状态尝试终止线程池
            tryTerminate();
        } finally {
            mainLock.unLock();
        }
    }
    

    addWorker()方法是用来添加执行任务,这个流程可以分为两部分来看,第一部分是用于记录线程数量,第二部分是在独占锁里创建执行线程并启动。流程如下:

    • 首先判断当前线程池的状态是否是SHUTDOWNSTOPTIDYINGTERMINATED中的一个。并且当前状态为SHUTDOWN、且传入的任务为NULL、同时任务队列不为空。那么就返回false
    • 不满足上一点然后判断线程数是否超过核心线程数或者最大线程数(根据传入的core判断),如果超过则返回false
    • 然后通过CAS操作增加线程池数量,成功跳出循环体。
    • 线程池数量记录成功之后,创建工作实例,使用独占锁创建工作线程并加入到工作线程集合,并记录添加状态,添加成功则启动工作线程,记录启动状态,如果最后启动失败则调用addWorkerFailed()方法移除线程等操作。

    流程图如下:

    内部类Worker源码分析

    线程池中的每一个具体的工作线程被包装为内部类Worker实例,Worker继承与AQS,实现了Runnable接口,源码如下:

    private final class Worker extends AbstractQueuedSynchronized implements Runnable {
        
        private static final long serialVersionUID = 6138294804551838833L;
        
        // 保存ThreadFactory创建的线程实例,如果创建失败为null
        final Thread thread;
        
        // 保存传入的Runnable实例
        Runnable firstTask;
        
        // 记录线程完成的任务总数
        volatile long completedTasks;
        
        // 唯一构造方法,传入任务实例firstTask, 可以为null
        Worker(Runnable firstTask) {
            // 禁止线程中断,直到runWorker方法执行
            setState(-1);
            this.firstTask = firseTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run() {
            // 调用外部的runWorker方法执行真正的任务
            runWorker(this);
        }
        
        // 是否持有独占锁,state = 0表示没有获取锁,state > 0表示获取锁
        protected boolean isHeldExclusively() {
                return getState() != 0;
        }
    
        // 尝试获取独占锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        // 尝试释放独占锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    	
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        // 启动后进行线程中断
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    Worker的构造函数里面的逻辑十分重要,通过ThreadFactory创建Thread实例同时引入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。只要Worker持有的线程实例w调用Thread#start()方法就能执行Worker#run()。简化一下逻辑如下:

    // addWorker()方法中构造
    Worker worker = createWorker();
    // 通过线程池构造时候传入
    ThreadFactory threadFactory = getThreadFactory();
    // Worker构造函数中
    Thread thread = threadFactory.newThread(worker);
    // addWorker()方法中启动
    thread.start();
    

    Worker继承AQS,这里使用了AQS的独占锁模式,这里有个技巧是构造Worker的时候,把AQS资源状态通过setState(-1)设置成-1,这是因为Wokrer实例刚创建时AQSstate的默认值是0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()方法。

    runWorker方法源码分析

    final void runWorker(Worker w) {
        // 获取当前线程,实际上和Wokrer持有的线程实例是相同的
        Thread wt = Thread.currentThread();
        // 获取worker中持有的初始化时传入的任务对象,这里存放临时变量task
        Runnable task = w.firstTask;
        // 设置Worker中持有的初始化时传入的任务对象为null
        w.firstTask = null;
        // 构造方式的是state设置成-1,这里解锁state设成为0,允许线程中断
        w.unlock(); // allow interrupts
        // 记录线程是否因为用户异常终结
        boolean completedAbruptly = true;
        try {
            // 初始化任务对象不为空或者从任务队列获取到的任务对象不为空
            while (task != null || (task = getTask()) != null) {
                // 加锁
                w.lock();
                // 如果线程池正在停止状态,线程需要终止
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任务执行之前
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务执行之后
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 任务对象设为null
                    task = null;
                    // 完成工作数量+1
                    w.completedTasks++;
                    // 解锁
                    w.unlock();
                }
            }
            // 正常完成任务
            completedAbruptly = false;
        } finally {
            // 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    runWorker方法的核心流程如下:

    1. Worker先执行解锁操作,允许线程中断。
    2. 通过while循环调用getTask()方法获取任务对象,首轮循环可能是外部传入的收个任务对象。
    3. 如果线程池状态变为STOP状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程不是中断状态。
    4. 执行任务实例Runnable#run()方法,任务执行之前和之后分别会调用beforeExecute()afterExecute()
    5. while循环跳出说明任务全部执行完毕,然后会调用processWorkerExit()方法处理工作线程退出后的工作。

    千言万语不如一图,流程图如下:

    getTask方法源码解析

    getTask()方法是工作线程在while死循环中获取任务队列中的任务对象的方法,源码如下:

    private Runnable getTask() {
        
        // 记录上一次从队列中获取的时候是否超时
        boolean timeOut = false;
        // 循环
        for(;;) {
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);
            
            // 如果线程池状态至少为SHUTDOWN,如果线程池状态STOP或者任务队列为空
            // 则工作线程数量wc减1,直接返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())){
                decrementWorkerCount();
                return null;
            }
            
            // 获取工作线程数
            int wc = workerCountOf(c);
            
            // timed临时变量用于线程超时控制,决定是否需要通过poll()的非阻塞方法从任务队列获取任务
            // allowCoreThreadTimeOut默认为false,如果设置成true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
            // 工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // 1.工作线程数大于最大线程数或者timed && timedOut 说明线程命中了超时控制并且上一轮循环通过poll()方法从任务队列获取任务为null
            // 并且工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            
            try {
                // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
                // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
                Runnable r = timed ? 
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                	workQueue.take();
                if (r != null)
                    return r;
                timedCount  = true;
            } catch (InterruptedException retry) {
                    timedOut = false;
            }
        }
    }
    

    这个方法中,有两处十分复杂的if逻辑,先来看第一处,对于第一处if可能导致工作线程数量减去1直接返回null的场景有:

    • 线程池状态为SHUTDOWN,一般是调用了shutdown()方法,并且任务队列为空。
    • 线程池状态为STOP

    对于第二处if逻辑有点复杂,先拆解一下:

    // 工作线程总数大于最大线程数
    boolean b1 = wc > maximumPoolSize;
    // 允许线程超时同时上一轮通过poll()方法从任务队列中获取任务为null
    boolean b2 = timed && timedOut;
    // 作线程总数大于1
    boolean b3 = wc > 1;
    // 任务队列为空
    boolean b4 = workQueue.isEmpty();
    if (r) {
        if (compareAndDecrementWorkerCount(c)){
            return null;
        }else{
            continue;
        }
    }
    

    这段逻辑大多数情况下是针对非核心线程的。在execute()方法中,线程总数大于核心线程并且小于最大线程数时,会调用addWorker(task, false)方法添加非核心线程,而这里的逻辑恰好是想法的操作,用于减少非核心线程数,使得工作县城总数总是接近于核心线程数。如果对于核心线程,上一轮循环获取对象为null,这一轮循环很容易满足timed && timedOuttrue,这个时候getTask()返回null导致runWorker()方法跳出循环,最后执行processWorkerExit()方法处理工作,而该非核心线程对应的Worker则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程,那么可以总结出keepAliveTime的意义:

    • 当允许核心线程超时,也就是allowCoreThreadTimeOut设置为true的时候,此时keepAliveTime表示空闲的工作线程存活周期。
    • 默认情况下不允许核心线程超时,此时keepAliveTime表示空闲的非核心线程存活周期。

    三、手写一个线程池

    通过上面对ThreadPoolExecutor的学习,我们可以手写一个简单的线程池,包含了线程的核心逻辑,包含了提交任务,添加任务,获取任务,执行任务核心逻辑。

    这个手写线程池的逻辑也非常简单,只体现核心流程,包括:

    1. 有n个一直执行的线程。
    2. 把线程提交给线程池运行。
    3. 如果线程池已满,则把线程放入队列中。
    4. 最后当有空闲时,则获取队列中线程进行运行。

    代码实现:

    public class ThreadPoolTrader implements Executor {
    
        private final AtomicInteger ctl = new AtomicInteger(0);
    
        private volatile int corePoolSize;
        private volatile int maximumPoolSize;
        private final BlockingQueue<Runnable> workQueue;
    
        public ThreadPoolTrader(int corePoolSize, int maximumPoolSize,
                                BlockingQueue<Runnable> workQueue) {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
        }
    
        @Override
        public void execute(Runnable command) {
            int c = ctl.get();
            if (c < corePoolSize) {
                if (!addWorker(command)) {
                    reject();
                }
                return;
            }
            if (!workQueue.offer(command)) {
                if (!addWorker(command)) {
                    reject();
                }
            }
        }
    
        private boolean addWorker(Runnable firstTask) {
            if (ctl.get() >= maximumPoolSize) return false;
            Worker worker = new Worker(firstTask);
            worker.thread.start();
            ctl.incrementAndGet();
            return true;
    
        }
    
        private final class Worker implements Runnable {
    
            final Thread thread;
            Runnable firstTask;
    
            Worker(Runnable firstTask) {
                this.thread = new Thread(this);
                this.firstTask = firstTask;
            }
    
            @Override
            public void run() {
                Runnable task = firstTask;
                try {
                    while (task != null || (task = getTask()) != null) {
                        System.out.println("当前执行任务的线程:" + Thread.currentThread().getName());
                        task.run();
                        if (ctl.get() > maximumPoolSize) {
                            break;
                        }
                        task = null;
                    }
                } finally {
                    ctl.decrementAndGet();
                }
            }
        }
    
        private Runnable getTask() {
            for (; ; ) {
                try {
                    System.out.println("workQueue.size:" + workQueue.size());
                    return workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void reject() {
            throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size());
        }
    
        public static void main(String[] args) {
            ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2,
                    new ArrayBlockingQueue<Runnable>(10));
            for (int i = 0; i < 10; i++) {
                int finalI = i;
                threadPoolTrader.execute(() ->{
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务编号:" + finalI);
                });
            }
        }
    }
    

    上面的代码测试如下:

    当前执行任务的线程:Thread-1
    当前执行任务的线程:Thread-0
    任务编号:1
    任务编号:0
    workQueue.size:8
    workQueue.size:8
    当前执行任务的线程:Thread-0
    当前执行任务的线程:Thread-1
    任务编号:3
    任务编号:2
    workQueue.size:6
    当前执行任务的线程:Thread-1
    workQueue.size:6
    当前执行任务的线程:Thread-0
    任务编号:5
    workQueue.size:4
    当前执行任务的线程:Thread-0
    任务编号:4
    workQueue.size:3
    当前执行任务的线程:Thread-1
    任务编号:6
    workQueue.size:2
    当前执行任务的线程:Thread-0
    任务编号:7
    workQueue.size:1
    当前执行任务的线程:Thread-1
    任务编号:8
    任务编号:9
    workQueue.size:0
    workQueue.size:0
    

    四、创建线程池的四种方式

    Java 创建线程池的四种方式

    展开全文
  • 主要介绍了Java线程池FutureTask实现原理详解,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
  • Java线程池实现原理深度分析

    万次阅读 2020-06-05 09:05:45
    1.1 线程池是什么 线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。 线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了...

    一、写在前面

    1.1 线程池是什么

    线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

    线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

    而本文描述线程池是JDK中提供的ThreadPoolExecutor类。

    当然,使用线程池可以带来一系列好处:

    • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
    • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
    • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
    • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

    1.2 线程池解决的问题是什么

    线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

    1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
    2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
    3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

    为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

    Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia

    “池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。

    在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

    1. 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
    2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
    3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

    在了解完“是什么”和“为什么”之后,下面我们来一起深入一下线程池的内部实现原理。

     

    二、线程池核心设计与实现

    在前文中,我们了解到:线程池是一种通过“池化”思想,帮助我们管理线程而获取并发性的工具,在Java中的体现是ThreadPoolExecutor类。那么它的的详细设计与实现是什么样的呢?我们会在本章进行详细介绍。

    2.1 总体设计

    Java中的线程池核心实现类是ThreadPoolExecutor,本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。我们首先来看一下ThreadPoolExecutor的UML类图,了解下ThreadPoolExecutor的继承关系。

    ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

    ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:

    线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

    接下来,我们会按照以下三个部分去详细讲解线程池运行机制:

    1. 线程池如何维护自身状态。
    2. 线程池如何管理任务。
    3. 线程池如何管理线程。

    2.2 生命周期管理

    线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    

    ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

    关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

    private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
    private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl
    

    ThreadPoolExecutor的运行状态有5种,分别为:

    其生命周期转换如下入所示:

    2.3 任务执行机制

    2.3.1 任务调度

    任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。

    首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

    1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
    2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
    3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
    4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
    5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

    其执行流程如下图所示:

    2.3.2 任务缓冲

    任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

    下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:

     

    使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:

     

    2.3.3 任务申请

    由上文的任务分配部分可知,任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。

    线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:

    获取任务流程图

     

    getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

    2.3.4 任务拒绝

    任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

    拒绝策略是一个接口,其设计如下:

    public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

    2.4 Worker线程管理

    2.4.1 Worker线程

    线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。我们来看一下它的部分代码:

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        final Thread thread;//Worker持有的线程
        Runnable firstTask;//初始化的任务,可以为null
    }
    

    Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

    Worker执行任务的模型如下图所示:

    Worker执行任务

    线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。

    ​Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

    1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。 2.如果正在执行任务,则不应该中断线程。 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

    在线程回收过程中就使用到了这种特性,回收过程如下图所示:

    线程池回收过程

     

    2.4.2 Worker线程增加

    增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:

    申请线程执行流程图

     

    2.4.3 Worker线程回收

    线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

    try {
      while (task != null || (task = getTask()) != null) {
        //执行任务
      }
    } finally {
      processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
    }
    

    线程回收的工作是在processWorkerExit方法完成的。

    线程销毁流程

     

    事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

    2.4.4 Worker线程执行任务

    在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

    1.while循环不断地通过getTask()方法获取任务。 2.getTask()方法从阻塞队列中取任务。 3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。 4.执行任务。 5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

    执行流程如下图所示:

    执行任务流程

     

     

    展开全文
  • Java线程池实现原理与源码解析(jdk1.8)

    万次阅读 多人点赞 2018-04-03 10:09:49
    为什么需要线程池线程池能够对线程进行...Java是如何实现和管理线程池的? 从JDK 5开始,把工作单元与执行机制分离开来,工作单元包括Runnable和Callable,而执行机制有Executor框架提供。 newScheduled...

    为什么需要线程池?
    线程池能够对线程进行统一分配,调优和监控:
    - 降低资源消耗(线程无限制地创建,然后使用完毕后销毁)
    - 提高响应速度(无须创建线程)
    - 提高线程的可管理性

    Java是如何实现和管理线程池的?
    从JDK 5开始,把工作单元与执行机制分离开来,工作单元包括RunnableCallable,而执行机制有Executor框架提供。

    这里写图片描述

    ScheduledThreadPool
    初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

    1. 工作原理

    当一个任务提交至线程池之后,
    1. 线程池首先判断核心线程池里的线程是否已经满了。如果不是,则创建一个新的工作线程来执行任务。否则进入2.
    2. 判断工作队列是否已经满了,倘若还没有满,将线程放入工作队列。否则进入3.
    3. 判断线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行。如果线程池满了,则交给饱和策略来处理任务。

    ThreadPoolExecutor执行execute()流程:
    当一个任务提交至线程池之后,
    1. 线程池首先当前运行的线程数量是否少于corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入2.
    2. 判断BlockingQueue是否已经满了,倘若还没有满,则将线程放入BlockingQueue。否则进入3.
    3. 如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。

    当ThreadPoolExecutor创建新线程时,通过CAS来更新线程池的状态ctl.

    2. ThreadPoolExecutor

    3. 创建线程池

    3.1 参数

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler)
    • corePoolSize
      线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
    • workQueue
      用来保存等待被执行的任务的阻塞队列. 在JDK中提供了如下阻塞队列:
      (1) ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
      (2) LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
      (3) SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
      (4) priorityBlockingQuene:具有优先级的无界阻塞队列;

      LinkedBlockingQueue比ArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(), take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer().

    • maximumPoolSize
      线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue.

    • keepAliveTime
      线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止;
    • unit
      keepAliveTime的单位
    • threadFactory
      创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory
    • handler
      handler
      线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
      • AbortPolicy:直接抛出异常,默认策略;
      • CallerRunsPolicy:用调用者所在的线程来执行任务;
      • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
      • DiscardPolicy:直接丢弃任务;
        当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

    3.2 三种类型

    3.2.1 newFixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程。

    FixedThreadPool的工作队列为无界队列LinkedBlockingQueue(队列容量为Integer.MAX_VALUE), 这会导致以下问题:
    - 线程池里的线程数量不超过corePoolSize,这导致了maximumPoolSizekeepAliveTime将会是个无用参数
    - 由于使用了无界队列, 所以FixedThreadPool永远不会拒绝, 即饱和策略失效

    3.2.2 newSingleThreadExecutor
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行.

    由于使用了无界队列, 所以SingleThreadPool永远不会拒绝, 即饱和策略失效

    3.2.3 newCachedThreadPool
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    1. 线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
    2. 和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;

    执行过程与前两种稍微不同:
    (1) 主线程调用SynchronousQueue的offer()方法放入task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue的task, 即调用了SynchronousQueue的poll(), 那么主线程将该task交给空闲线程. 否则执行(2)
    (2) 当线程池为空或者没有空闲的线程, 则创建新的线程执行任务.
    (3) 执行完任务的线程倘若在60s内仍空闲, 则会被终止. 因此长时间空闲的CachedThreadPool不会持有任何线程资源.

    3.3 关闭线程池

    3.3.1 原理

    遍历线程池中的所有线程,然后逐个调用线程的interrupt方法来中断线程.

    3.3.2 关闭方式

    • shutdown
      将线程池里的线程状态设置成SHUTDOWN状态, 然后中断所有没有正在执行任务的线程.
    • shutdownNow
      将线程池里的线程状态设置成STOP状态, 然后停止所有正在执行或暂停任务的线程.

    只要调用这两个关闭方法中的任意一个, isShutDown() 返回true.
    当所有任务都成功关闭了, isTerminated()返回true.

    4. ThreadPoolExecutor源码

    基于JDK1.8

    4.1 内部状态

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }

    其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
    1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
    2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
    3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
    4、TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
    5、TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成
    这里写图片描述

    4.2 任务的执行

    execute –> addWorker –>runworker (getTask)
    线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。
    从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
    firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

    4.2.1 execute()方法

    ThreadPoolExecutor.execute(task)实现了Executor.execute(task)

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {  
        //workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
           if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // double check: c, recheck
        // 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
            //如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //线程池处于running状态,但是没有线程,则创建线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 往线程池中创建新的线程失败,则reject任务
        else if (!addWorker(command, false))
            reject(command);
    }

    为什么需要double check线程池的状态?
    在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

    4.2.2 addWorker方法

    从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务
    线程池创建新线程执行任务时,需要 获取全局锁:

    private final ReentrantLock mainLock = new ReentrantLock();
    private boolean addWorker(Runnable firstTask, boolean core) {
           // CAS更新线程池数量
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    // 线程池重入锁
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();  // 线程启动,执行任务(Worker.thread(firstTask).start());
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    4.2.3 Worker类的runworker方法

     private final class Worker
             extends AbstractQueuedSynchronizer
             implements Runnable{
         Worker(Runnable firstTask) {
             setState(-1); // inhibit interrupts until runWorker
             this.firstTask = firstTask;
             this.thread = getThreadFactory().newThread(this); // 创建线程
         }
         /** Delegates main run loop to outer runWorker  */
         public void run() {
             runWorker(this);
         }
         // ...
     }
    • 继承了AQS类,可以方便的实现工作线程的中止操作;
    • 实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
    • 当前提交的任务firstTask作为参数传入Worker的构造方法;

    runWorker方法是线程池的核心:
    1. 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;
    2. Worker执行firstTask或从workQueue中获取任务:
    2.1. 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
    2.2. 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
    2.3. 执行beforeExecute
    2.4 执行任务的run方法
    2.5 执行afterExecute方法
    2.6 解锁操作

    通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                // 先执行firstTask,再从workerQueue中取task(getTask())
    
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

    4.2.4 getTask方法

    getTask方法从阻塞队列中获取等待的任务

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    注意这里一段代码是keepAliveTime起作用的关键:

    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

    allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。
    如果线程允许空闲等待而不被销毁timed == falseworkQueue.take任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
    如果线程不允许无休止空闲timed == true, workQueue.poll任务:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

    4.3 任务的提交

    这里写图片描述
    1. submit任务,等待线程池execute
    1. 执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;
    2. FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程;

    public class Test{
    
        public static void main(String[] args) {
    
            ExecutorService es = Executors.newCachedThreadPool();
            Future<String> future = es.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "future result";
                }
            });
            try {
                String result = future.get();
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
    1. Callable接口类似于Runnable,只是Runnable没有返回值。
    2. Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
    3. Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

    4.3.1 submit方法

    AbstractExecutorService.submit()实现了ExecutorService.submit()
    可以获取执行完的返回值, 而ThreadPoolExecutorAbstractExecutorService.submit()的子类,所以submit方法也是ThreadPoolExecutor`的方法。

    // submit()在ExecutorService中的定义
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);
    // submit方法在AbstractExecutorService中的实现
    public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            // 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }

    通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

    4.3.2 FutureTask对象

    public class FutureTask<V> implements RunnableFuture<V> 可以将FutureTask提交至线程池中等待被执行(通过FutureTask的run方法来执行)

    内部状态
        /* The run state of this task, initially NEW. 
         * ...
         * Possible state transitions:
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;

    内部状态的修改通过sun.misc.Unsafe修改

    get方法
    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        } 

    内部通过awaitDone方法对主线程进行阻塞,具体实现如下:

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    1. 如果主线程被中断,则抛出中断异常;
    2. 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
    3. 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
    4. 通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;
    5. 最终通过LockSupport的park或parkNanos挂起线程;
    run方法
    public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

    FutureTask.run方法是在线程池中被执行的,而非主线程
    1. 通过执行Callable任务的call方法;
    2. 如果call执行成功,则通过set方法保存结果;
    3. 如果call执行有异常,则通过setException保存异常;

    5. 配置线程池需要考虑哪些因素

    从任务的优先级,任务的执行时间长短,任务的性质(CPU密集/ IO密集),任务的依赖关系这四个角度来分析。并且近可能地使用有界的工作队列

    性质不同的任务可用使用不同规模的线程池分开处理:
    - CPU密集型:尽可能少的线程,Ncpu+1
    - IO密集型:尽可能多的线程, Ncpu*2,比如数据库连接池
    - 混合型:CPU密集型的任务与IO密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。

    6. 如何监控线程池的状态

    可以使用ThreadPoolExecutor以下方法:
    getTaskCount() Returns the approximate total number of tasks that have ever been scheduled for execution.
    getCompletedTaskCount() Returns the approximate total number of tasks that have completed execution. 返回结果少于getTaskCount()
    getLargestPoolSize() Returns the largest number of threads that have ever simultaneously been in the pool. 返回结果小于等于maximumPoolSize
    getPoolSize() Returns the current number of threads in the pool.
    getActiveCount() Returns the approximate number of threads that are actively executing tasks.

    Reference

    https://www.jianshu.com/p/87bff5cc8d8c
    《Java并发编程艺术》

    展开全文
  • Java线程池实现原理

    2022-03-17 13:36:52
    参考:Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com) 一、线程池是什么 线程池是一种对线程进行池化管理的思想和工具,广泛应用于多线程服务器中 线程的创建和销毁都会带来很多额外开销...
  • 可以看到,Executor是顶层的一个接口,线程池实现类是ScheduledThreadPoolExecutor和ThreadPoolExecutor,而Executors是一个工具类,用于创建实际的线程池,接下来看看源码。 点进源码查看(是 JDK1.8 版本加入...
  • jdk在java5版本中增加了内置线程池实现ThreadPoolExecutor,本文通过ThreadPoolExecutor的源码分析jdk中线程池的实现原理。 线程池由两个核心数据结构组成: 1)线程集合(workers):存放执行任务的线程,是一个...
  • Java线程池实现原理及其在美团业务中的实践https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w 思考总结 关键提升在降低了线程池参数修改的成本 风险:线程池大小一般不能随意调整,不合理的调整可能带来系统间...
  • java线程池 实现原理

    2019-09-15 20:25:12
    java线程池 实现原理 线程池的好处: 降低资源消耗 提高响应速度 提高线程的可管理性 线程池的实现原理 下图是提交一个新任务到线程时,线程池的处理流程 ! ThreadPoolExecutor执行execute方法分下面4种情况 如果...
  • 详述Java线程池实现原理

    千次阅读 多人点赞 2021-06-03 21:40:19
    1.1 线程池是什么 线程池(Thread Pool) 是一种池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。 线程过多会带来额外的开销,其中包括创建销毁线程的开销,操作系统调度线程的开销等等,同时也降低了...
  • Java线程池实现原理之自我见解

    千次阅读 2020-03-18 22:24:57
    Java线程池的执行流程图: Java线程池有哪几种: 1. newSingleThreadPool 单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务, LinkedBlockingQueue 阻塞队列 2. newFixedThreadPool 固定...
  • Java线程池实现原理及其在美团业务中的实践

    千次阅读 多人点赞 2020-04-02 19:58:26
    总第387篇2020年 第10篇随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池Thr...
  • 线程池实现原理
  • Java线程池工作原理

    万次阅读 多人点赞 2018-11-10 01:08:13
    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池,所以我们就要认识并弄懂线程池,以便于更好的为我们业务场景服务。 一、线程池的好处 在开发过程中,合理地...
  • 本文开篇简述线程池概念和用途,接着结合线程池的源码,帮助读者领略线程池的设计思路,最后回归实践,通过案例讲述使用线程池遇到的问题,并给出了一种动态化线程池解决方案。 一、前言 1.1 线程池是什么 线程池...
  • 期待与你,一起进步随着cpu核数越来越多,不可避免的利用多线程技术以充分...在Java用有一个Executors工具类,可以为我们创建一个线程池,其本质就是new了一个ThreadPoolExecutor对象。线程池几乎也是面试必考问题...
  • https://juejin.im/post/5e86e13ee51d4546e07b5f29
  • Java线程池实现原理和源码分析 文章目录Java线程池实现原理和源码分析前言外观线程池继承关系构造函数成员变量创建线程池任务阻塞队列SynchronousQueueArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue...
  • Java面试题之:线程池原理一、简介二、线程复用三、线程池的组成四、拒绝策略五、Java 线程池工作过程 一、简介   线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动...
  • Java线程池实现原理---美团技术栈 随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池ThreadPoolExecutor类,帮助...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 101,941
精华内容 40,776
关键字:

java线程池实现原理

java 订阅
友情链接: CC2530KEYTest.rar