精华内容
下载资源
问答
  • 线程池原理

    万次阅读 多人点赞 2017-07-27 17:49:50
    线程池中,线程数量一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限,而如果服务器不利用线程池来处理这些请求则线程总数为100000。一般线程池尺寸是远小于100000。所以利用线程池的服务器程序...

    友情推荐:

    1. 多线程中断机制
    2. 深入Thread.sleep
    3. head first Thread.join()

    面向对象编程中,对象创建和销毁是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是对一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些”池化资源”技术产生的原因。比如大家所熟悉的数据库连接池就是遵循这一思想而产生的,下面将介绍的线程池技术同样符合这一思想。

    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:
    假设一台服务器完成一项任务的时间为T

     T1 创建线程的时间
     T2 在线程中执行任务的时间,包括线程间同步所需时间
     T3 线程销毁的时间
    

    显然T = T1+T2+T3。注意这是一个极度简化的假设。
    可以看出T1,T3是多线程本身附加的开销,用户希望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在应用程序中频繁的创建或销毁线程,这导致T1和T3在T中占有非常大的比例。

    线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1、T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1、T3的开销了,线程池不仅调整T1、T3产生的时间,而且它还显著减少了创建线程的数目。在看一个例子:

    假设一台服务器每天大约要处理100000个请求,并且每个请求需要一个单独的线程完成,这是一个很常用的场景。在线程池中,线程数量一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限,而如果服务器不利用线程池来处理这些请求则线程总数为100000。一般线程池尺寸是远小于100000。所以利用线程池的服务器程序不会为了创建100000而在处理请求时浪费时间,从而提高效率。

    线程池是一种多线程处理方法,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程,每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程处于空闲状态,则线程池将会调度一个任务给它,如果所有线程都始终保持繁忙,但将任务放入到一个队列中,则线程池将在一段时间后创建另一个辅助线程,但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

    线程池主要有如下几个应用范围:

    1. 需要大量的线程来完成任务,且完成任务的时间比较短,如WEB服务器完成网页请求这样的任务。因为单个任务小,而任务数量巨大,比如一个热门网站的点击次数。 但对于长时间的任务,比如一个ftp连接请求,线程池的优点就不明显了。因为ftp会话时间相对于线程的创建时间长多了。

    2. 对性能要求苛刻的应用,比如要求服务器迅速相应客户请求。

    3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限。

    下面将讨论线程池的简单实现,以说明线程技术优点及应用领域。

    线程池的简单实现

    一般一个简单线程池至少包含下列组成部分。

    1. 线程池管理器(ThreadPoolManager):用于创建并管理线程池。
    2. 工作线程(WorkThread): 线程池中线程。
    3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
    4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

    线程池管理器至少有下列功能:创建线程池、销毁线程池、添加新任务

    创建线程池的部分代码如下:

    public class ThreadPoolManager {
        private int threadCount; //启动的线程数
        private WorkThread[] handlers; //线程数组
        private ArrayList<Runnable> taskVector = new ArrayList<Runnable>(); //任务队列
    
        ThreadPoolManager(int threadCount) {
           this.threadCount = threadCount;
           for (int i = 0; i < threadCount; i++) {
               handlers[i] = new WorkThread();
               handlers[i].start();
           }
        }
    
        void shutdown() {   
           synchronized (taskVector) {
               while (!taskVector.isEmpty())
                  taskVector.remove(0); //清空任务队列
           }
           for (int i = 0; i < threadCount; i++) {
               handlers[i] = new WorkThread();
               handlers[i].interrupt(); //结束线程
           }
        }
    
        void execute(Runnable newTask) { //增加新任务
           synchronized (taskVector) {
               taskVector.add(newTask);
               taskVector.notifyAll();
           }
    
        }
    
        private class WorkThread extends Thread {
           public void run() {
               Runnable task = null;
               for (;;) {
                  synchronized (taskVector) {//获取一个新任务
                      if (taskVector.isEmpty())
                         try {
                             taskVector.wait();
                             task = taskVector.remove(0);
                         } catch (InterruptedException e) {
                             break;
                         }
                  }
                  task.run();
               }
           }
        }
    }

    ThreadPoolManager构造函数允许用户设置启动的线程数量,并且需要的创建线程。Shutdown函数主要关闭打开的线程和清空还没有执行的任务,execute函数将任务加入到工作队列,并且唤醒等待的线程。WorkThread就是实际的工作线程,工作线程是一个可以循环执行任务的线程,在没有任务时将等待,当有任务时,会被唤醒。任务接口是为所有任务提供统一的接口,以便工作线程处理,在这里我们采用java定义的Runnable接口,用户可以实现这个接口来完成想要的事务。实现一个线程池需要了解线程的同步机制,这部分将在后面介绍。

    Java自带线程池

    自从Java1.5之后,Java 提供了自己的线程池ThreadPoolExecutor和ScheduledThreadPoolExecutor,我们先看类之间的结构图。

    线程池

    关于线程池的主要类有如下几部分:
    接口:Executor、ExecutorService、ScheduledExecutorService
    类:Executors、AbstractExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor。

    ThreadPoolExecutor

    首先看看ThreadPoolExecutor的构造函数,ThreadPoolExecutor提供了几个构造函数,我们先来参数最全构造函数的含义。

        public ThreadPoolExecutor(int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue,
                                     ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler)
    • corePoolSize:线程池维护线程的最少数量
    • maximumPoolSize:线程池维护线程的最大数量
    • keepAliveTime:线程池维护线程所允许的空闲时间 ,所以如果任务很多,并且每个任务执行的时间比较短,可以适当调大这个参数来提高线程的利用率。
    • unit: keepAliveTime 参数的单位,可选的单位:天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS)和纳秒(NANOSECONDS)
    • workQueue:任务队列,用来存放我们所定义的任务处理线程,BlockingQueue是一种带锁的阻塞队列,我们将在后面专门讲解这种数据结构,BlockingQueue有四种选择:
      (1)ArrayBlockingQueue,是一种基于数组的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行操作;
      (2)LinkedBlockingQueue,是一个基于链表的阻塞队列,此队列也按FIFO (先进先出)对元素进行操作,吞吐量通常要高于ArrayBlockingQueue, Executors.newFixedThreadPool()使用了这种队列;
      (3)SynchronousQueue;是一种不存储元素的阻塞队列,每个插入操作必须等另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,Executors.newCachedThreadPool使用了这个队列;
      (4)PriorityBlockingQueue,是一种具有优先权的阻塞队列,优先级大的任务可以先执行,用户由此可以控制任务的执行顺序。这四种阻塞队列都有自己的使用场景,用户可以根据需要自己决定使用。
    • threadFactory:创建新线程时使用的工厂,threadFactory有两种选择:
      (1)DefaultThreadFactory,将创建一个同线程组且默认优先级的线程;
      (2)PrivilegedThreadFactory,使用访问权限创建一个权限控制的线程。ThreadPoolExecutor默认采用DefaultThreadFactory。
    • handler 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理策略,handler有四个选择:
      (1)ThreadPoolExecutor.AbortPolicy(),将抛出RejectedExecutionException异常;
      (2)ThreadPoolExecutor.CallerRunsPolicy(),将重试添加当前的任务,重复调用execute()方法;
      (3)ThreadPoolExecutor.DiscardOldestPolicy(),将抛弃旧任务;
      (4)ThreadPoolExecutor.DiscardPolicy,将直接抛弃任务。ThreadPoolExecutor默认采用AbortPolicy。

    一个任务通过execute(Runnable)方法被添加到线程池,任务必须是一个 Runnable类型的对象,任务的执行方法就是调用Runnable类型对象的run()方法。当一个任务通过execute(Runnable)方法欲添加到线程池时,会做一下几步:

    1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
    2. 如果此时线程池中的数量大于等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
    3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理添加的任务。
    4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
    5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    读者可以参考下面的源代码,分析execute函数执行的流程:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                if (runState == RUNNING && workQueue.offer(command)) {
                    if (runState != RUNNING || poolSize == 0)
                        ensureQueuedTaskHandled(command);
                }
                else if (!addIfUnderMaximumPoolSize(command))
                    reject(command); // 执行handler策略
            }
        }
      当数量少于corePoolSize时的主要流程:
        private boolean addIfUnderCorePoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < corePoolSize && runState == RUNNING)
                    t = addThread(firstTask); //创建新线程
            } finally {
                mainLock.unlock();
            }
            if (t == null)
                return false;
            t.start();
            return true;
        }
    当数量大于corePoolSize,小于maximumPoolSize,且阻塞队列不能存储任务时,执行的主要流程:
        private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (poolSize < maximumPoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            } finally {
                mainLock.unlock();
            }
            if (t == null)
                return false;
            t.start();
            return true;
        }

    如果想在多线程环境中定期执行去执行任务,或者做一些其他事情,用户可以通过Timer来实现,但是Timer有几种缺陷:

    1. Timer是基于绝对时间的,容易受系统时钟的影响;
    2. Timer只新建了一个线程来执行所有的TimeTask,所有TimeTask可能会相关影响;
    3. Timer不会捕获TimerTask的异常,只是简单地停止。

    这样势必会影响其他TimeTask的执行。JDK提供了一种定时功能的线程池:ScheduledThreadPoolExecutor,它继承了ThreadPoolExecutor,并且实现了ScheduledExecutorService接口,此接口有如下几个方法:

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    创建并执行在给定延迟后启用的一次性操作:

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                       long delay, TimeUnit unit);

    创建并执行在给定延迟后启用的一次性操作:

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                           long initialDelay,
                           long period,
                         TimeUnit unit);

    创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是经过period 后开始执行,即在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。如果任务的任何一个执行遇到异常,则后续执行都会被取消。否则,只能通过执行程序的取消或终止方法来终止该任务。如果此任务的任何一个执行要花费比其周期更长的时间,则将推迟后续执行,但不会同时执行。

        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                              long initialDelay,
                              long delay,
                            TimeUnit unit);

    创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。

    ScheduledThreadPoolExecutor也提供了几个构造函数,下面列出的是其中最简单的一个,只有corePoolSize一个参数。ScheduledThreadPoolExecutor的构造函数仅做的一件事就是调用ThreadPoolExecutor的构造函数,它使用一种带有延时标记的等待队列DelayedWorkQueue。DelayedWorkQueue内部使用concurrent包里的DelayQueue,DelayQueue是一个无界的BlockingQueue,用于放置延时Delayed接口的对象,对象只能在其到期时才能从队列中取走,我们将在专门讲解这种数据结构。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue());
      }

    要配置一个线程池相对比较复杂,需要了解相关的参数,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池:

    public static ExecutorService newSingleThreadExecutor()

    创建仅有一个线程工作的线程池,相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么将创建有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    public static ExecutorService newCachedThreadPool()

    创建一个缓存线程池,如果线程池的大小超过了任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又动态添加新线程来处理任务。此线程池没有对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)所能够创建的最大线程大小。

    public static ExecutorService newFixedThreadPool(int nThreads)

    创建指定大小的线程池。每次提交一个任务就创建一个线程,直到线程数量达到线程池的最大值。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    类似于newCachedThreadPool,创建一个缓存线程池,此线程池还支持定时以及周期性执行任务。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor()

    类似于newSingleThreadExecutor,创建一个单线程的线程池,此线程池还支持定时以及周期性执行任务。

    下面用两个例子介绍线程池的使用方法,第一个例子会创建一个固定大小的线程池,第二个例子会创建基于时间线程池。

    第一个例子

        ExecutorService pool = Executors.newFixedThreadPool(2);
           //创建四个任务
           Thread t1 = new Task1();
           Thread t2 = new Task2();
           Thread t3 = new Task3();
           Thread t4 = new Task4();
           //放入线程池
           pool.execute(t1);
           pool.execute(t2);
           pool.execute(t3);
           pool.execute(t4);
           pool.shutdown(); //关闭线程池

    第二个例子:

           ExecutorService pool = Executors.newScheduledThreadPool(4);
           Thread t = new Task();
           pool.scheduleAtFixedRate(t,1, 5, TimeUnit.SECONDS);

    总结:

    1. FixedThreadPool是一个典型且优秀的线程池,它具有线程池的高效率和节省创建线程时所耗的开销的优点。但是在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
    2. CachedThreadPool的特点就是在线程池空闲时,即线程池中没有可运行任务时,它会释放工作线程,从而释放工作线程所占用的资源。但是当出现新任务时,又要创建新的工作线程,这会带来一定的系统开销。并且在使用CachedThreadPool时,一定要注意控制任务的数量,否则大量线程同时运行,可能会造成系统瘫痪。

    微信扫我^_^

    这里写图片描述

    展开全文
  • Java concurrency线程池之线程池原理(一)_动力节点Java学院整理,动力节点口口相传的Java黄埔军校
  • 这篇文章主要介绍了Python定时器线程池原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 定时器执行循环任务: 知识储备 Timer(interval, function, ...
  • 图解Java线程池原理

    2019-08-21 10:11:49
    什么是线程池? 为了避免频繁重复的创建和销毁线程,我们可以让这些线程进行复用,在线程池中,总会有活跃的线程在占用,但是线程池中也会存在没有占用的线程,这些线程处于空闲状态,当有任务的时候会从池子里面拿...

    什么是线程池?

    为了避免频繁重复的创建和销毁线程,我们可以让这些线程进行复用,在线程池中,总会有活跃的线程在占用,但是线程池中也会存在没有占用的线程,这些线程处于空闲状态,当有任务的时候会从池子里面拿去一个线程来进行使用,当完成工作后,并没有销毁线程,而是将将线程放回到池子中去。

    线程池主要解决两个问题:
    一是当执行大量异步任务时线程池能够提供很好的性能。
    二是线程池提供了一种资源限制和管理的手段,比如可以限制现成的个数,动态新增线程等。
    ​ -《Java并发编程之美》

    上面内容出自《Java并发编程之美》这本书,第一个问题上面已经提到过,线程的频繁创建和销毁是很损耗性能的,但是线程池中的线程是可以复用的,可以较好的提升性能问题,线程池内部是采用了阻塞队列来维护Runnable对象。

    (想自学习编程的小伙伴请搜索圈T社区,更多行业相关资讯更有行业相关免费视频教程。完全免费哦!)

    原理分析

    JDK为我们封装了一套操作多线程的框架Executors,帮助我们可以更好的控制线程池,Executors下提供了一些线程池的工厂方法:

    • newFixedThreadPool:返回固定长度的线程池,线程池中的线程数量是固定的。
    • newCacheThreadPool:该方法返回一个根据实际情况来进行调整线程数量的线程池,空余线程存活时间是60s
    • newSingleThreadExecutor:该方法返回一个只有一个线程的线程池。
    • newSingleThreadScheduledExecutor:该方法返回一个SchemeExecutorService对象,线程池大小为1,SchemeExecutorService接口在ThreadPoolExecutor类和 ExecutorService接口之上的扩展,在给定时间执行某任务。
    • newSchemeThreadPool:该方法返回一个SchemeExecutorService对象,可指定线程池线程数量。

    对于核心的线程池来说,它内部都是使用了ThreadPoolExecutor对象来实现的,只不过内部参数信息不一样,我们先来看两个例子:nexFixedThreadPoolnewSingleThreadExecutor如下所示:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    由上面的线程池的创建过程可以看到它们都是ThreadPoolExecutor的封装,接下来我们来看一下ThreadPoolExecutor的参数说明:

    参数名称参数描述
    corePoolSize指定线程池线程的数量
    maximumPoolSize指定线程池中线程的最大数量
    keepAliveTime当线程池线程的数量超过corePoolSize的时候,多余的空闲线程存活的时间,如果超过了corePoolSize,在keepAliveTime的时间之后,销毁线程
    unitkeepAliveTime的单位
    workQueue工作队列,将被提交但尚未执行的任务缓存起来
    threadFactory线程工厂,用于创建线程,不指定为默认线程工厂DefaultThreadFactory
    handler拒绝策略

    其中workQueue代表的是提交但未执行的队列,它是BlockingQueue接口的对象,用于存放Runable对象,主要分为以下几种类型:

    • 直接提交的队列:SynchronousQueue队列,它是一个没有容量的队列,前面我有对其进行讲解,当线程池进行入队offer操作的时候,本身是无容量的,所以直接返回false,并没有保存下来,而是直接提交给线程来进行执行,如果没有空余的线程则执行拒绝策略。

    • 有界的任务队列:可以使用ArrayBlockingQueue队列,因为它内部是基于数组来进行实现的,初始化时必须指定容量参数,当使用有界任务队列时,当有任务进行提交时,线程池的线程数量小于corePoolSize则创建新的线程来执行任务,当线程池的线程数量大于corePoolSize的时候,则将提交的任务放入到队列中,当提交的任务塞满队列后,如果线程池的线程数量没有超过maximumPoolSize,则创建新的线程执行任务,如果超过了maximumPoolSize则执行拒绝策略。

    • 无界的任务队列:可以使用LinkedBlockingQueue队列,它内部是基于链表的形式,默认队列的长度是Integer.MAX_VALUE,也可以指定队列的长度,当队列满时进行阻塞操作,当然线程池中采用的是offer方法并不会阻塞线程,当队列满时则返回false,入队成功则则返回true,当使用LinkedBlockingQueue队列时,有任务提交到线程池时,如果线程池的数量小于corePoolSize,线程池会产生新的线程来执行任务,当线程池的线程数量大于corePoolSize时,则将提交的任务放入到队列中,等待执行任务的线程执行完之后进行消费队列中的任务,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽。

    • 优先任务队列:t有限任务队列是带有执行优先级的队列,他可以使用PriorityBlockingQueue队列,可以控制任务的执行先后顺序,它是一个无界队列,该队列可以根据任务自身的优先级顺序先后执行,在确保性能的同时,也能有很好的质量保证。

    上面讲解了关于线程池内部都是通过ThreadPoolExecutor来进行实现的,那么下面我以一个例子来进行源码分析:

    public class ThreadPoolDemo1 {
    
        public static void main(String[] args) {
            ExecutorService executorService = new ThreadPoolExecutor(5,
                    10,
                    60L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(5), new CustomThreadFactory());
            for (int i = 0; i < 15; i++) {
                executorService.execute(() -> {
                    try {
                        Thread.sleep(50000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("由线程:" + Thread.currentThread().getName() + "执行任务完成");
                });
            }
        }
    }
    

    上面定义了一个线程池,线程池初始化的corePoolSize为5,也就是线程池中线程的数量为5,最大线程maximumThreadPoolSize为10,空余的线程存活的时间是60s,使用LinkedBlockingQueue来作为阻塞队列,这里还发现我自定义了ThreadFactory线程池工厂,这里我真是针对线程创建的时候输出线程池的名称,源码如下所示:

    /**
     * 自定义的线程池构造工厂
     */
    public class CustomThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        public CustomThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }
    
        @Override
        public Thread newThread(Runnable r) {
            String name = namePrefix + threadNumber.getAndIncrement();
            Thread t = new Thread(group, r,
                    name,
                    0);
            System.out.println("线程池创建,线程名称为:" + name);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    
    }
    

    代码和DefaultThreadFactory一样,只是在newThread新建线程的动作的时候输出了线程池的名称,方便查看线程创建的时机,上面main方法中提交了15个任务,调用了execute方法来进行提交任务,在分析execute方法之前我们先了解一下线程的状态:

    //假设Integer类型是32位的二进制表示。
    //高3位代表线程池的状态,低29位代表的是线程池的数量
    //默认是RUNNING状态,线程池的数量为0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //线程个数位数,表示的Integer中除去最高的3位之后剩下的位数表示线程池的个数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池的线程的最大数量
    //这里举例是32为机器,表示为00011111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //线程池的状态
    // runState is stored in the high-order bits
    //11100000000000000000000000000000
    //接受新任务并且处理阻塞队列里面任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    //00000000000000000000000000000000
    //拒绝新任务但是处理阻塞队列的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //00100000000000000000000000000000
    //拒接新任务并且抛弃阻塞队列里面的任务,同时会中断正在处理的任务
    private static final int STOP       =  1 << COUNT_BITS;
    //01000000000000000000000000000000
    //所有任务都执行完(包括阻塞队列中的任务)后当线程池活动线程数为0,将要调用terminated方法。
    private static final int TIDYING    =  2 << COUNT_BITS;
    //01100000000000000000000000000000
    //终止状态,terminated方法调用完成以后的状态
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    通过上面内容可以看到ctl其实存放的是线程池的状态和线程数量的变量,默认是RUNNING,也就是11100000000000000000000000000000,这里我们来假设运行的机器上的Integer的是32位的,因为有些机器上可能Integer并不是32位,下面COUNT_BITS来控制位数,也就是先获取Integer在该平台上的位数,比如说是32位,然后32位-3位=29位,也就是低29位代表的是现成的数量,高3位代表线程的状态,可以清晰看到下面的线程池的状态都是通过低位来进行向左位移的操作的,除了上面的变量,还提供了操作线程池状态的方法:

    // 操作ctl变量,主要是进行分解或组合线程数量和线程池状态。
    // 获取高3位,获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取低29位,获取线程池中线程的数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 组合ctl变量,rs=runStatue代表的是线程池的状态,wc=workCount代表的是线程池线程的数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     */
    //指定的线程池状态c小于状态s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    //指定的线程池状态c至少是状态s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    
    // 判断线程池是否运行状态
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    /**
     * CAS增加线程池线程数量.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    
    /**
     * CAS减少线程池线程数量
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    
    /**
     * 将线程池的线程数量进行较少操作,如果竞争失败直到竞争成功为止。
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    

    下来我们看一下ThreadPoolExecutor对象下的execute方法:

    public void execute(Runnable command) {
      	// 判断提交的任务是不是为空,如果为空则抛出NullPointException异常
        if (command == null)
            throw new NullPointerException();
      	// 获取线程池的状态和线程池的数量
        int c = ctl.get();
      	// 如果线程池的数量小于corePoolSize,则进行添加线程执行任务
        if (workerCountOf(c) < corePoolSize) {
          	//添加线程修改线程数量并且将command作为第一个任务进行处理
            if (addWorker(command, true))
                return;
          	// 获取最新的状态
            c = ctl.get();
        }
      	// 如果线程池的状态是RUNNING,将命令添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
          	//二次检查线程池状态和线程数量
            int recheck = ctl.get();
          	//线程不是RUNNING状态,从队列中移除当前任务,并且执行拒绝策略。
          	//这里说明一点,只有RUNNING状态的线程池才会接受新的任务,其余状态全部拒绝。
            if (! isRunning(recheck) && remove(command))
                reject(command);
          	//如果线程池的线程数量为空时,代表线程池是空的,添加一个新的线程。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
      	//如果队列是满的,或者是SynchronousQueue队列时,则直接添加新的线程执行任务,如果添加失败则进行拒绝
      	//可能线程池的线程数量大于maximumPoolSize则采取拒绝策略。
        else if (!addWorker(command, false))
            reject(command);
    }
    

    通过分析execute方法总结以下几点:

    1. 当线程池中线程的数量小于corePoolSize时,直接添加线程到线程池并且将当前任务做为第一个任务执行。
    2. 如果线程池的状态的是RUNNING,则可以接受任务,将任务放入到阻塞队列中,内部进行二次检查,有可能在运行下面内容时线程池状态已经发生了变化,在这个时候如果线程池状态变成不是RUNNING,则将当前任务从队列中移除,并且进行拒绝策略。
    3. 如果阻塞队列已经满了或者SynchronousQueue这种特殊队列无空间的时候,直接添加新的线程执行任务,当线程池的线程数量大于maximumPoolSize时相应拒绝策略。
    4. 入队操作用的是offer方法,该方法不会阻塞队列,如果队列已经满时或超时导致入队失败,返回false,如果入队成功返回true。

    针对上面例子源码我们来做一下分析,我们源码中阻塞队列采用的是ArrayBlockingQueue队列,并且指定队列的长度是5,我们看下面提交的线程池的任务是15个,而且corePoolSize设置的是5个核心线程,最大线程数(maximumPoolSzie)是10个(包括核心线程数),假设所有任务都同时提交到了线程池中,其中有5个任务会被提交到线程中作为第一个任务进行执行,会有5个任务被添加到阻塞队列中,还有5个任务提交到到线程池中的时候发现阻塞队列已经满了,这时候会直接提交任务,发现当前线程数是5小于最大线程数,可以进行新建线程来执行任务。
    在这里插入图片描述
    这里我们只是假设任务全部提交,因为我们在任务中添加了Thread.sleep睡眠一会,在for循环结束提交任务之后可能才会结束掉任务的睡眠执行任务后面内容,所以可以看做是全部提交任务,但是没有任务完成,如果有任务完成的话,可能就不会是触发最大的线程数,有可能就是一个任务完成后从队列取出来,然后另一个任务来的时候可以添加到队列中,上图中可以看到,有5个核心core线程在执行任务,任务队列中有5个任务在等待空余线程执行,而还有5个正在执行的线程,核心线程是指在corePoolSize范围的线程,而非核心线程指的是大于corePoolSize但是小于等于MaximumPoolSize的线程,就是这些非核心线程并不是一直存活的线程,它会跟随线程池指定的参数来进行销毁,我们这里指定了60s后如果没有任务提交,则会进行销毁操作,当然工作线程并不指定那些线程必须回收那些线程就必须保留,是根据从队列中获取任务来决定,如果线程获取任务时发现线程池中的线程数量大于corePoolSize,并且阻塞队列中为空时,则阻塞队列会阻塞60s后如果还有没有任务就返回false,这时候会释放线程,调用processWorkerExit来处理线程的退出,接下来我们来分析下addWorker都做了什么内容:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
          	//获取线程池的状态和线程池线程的数量
            int c = ctl.get();
          	//单独获取线程池的状态
            int rs = runStateOf(c);
    
            //检查队列是否只在必要时为空
            if (rs >= SHUTDOWN &&						//线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED
                ! (rs == SHUTDOWN &&				//可以看做是rs!=SHUTDOWN,线程池状态为STOP、TIDYING、TERMINATED
                   firstTask == null &&			//可以看做firstTask!=null,并且rs=SHUTDOWN
                   ! workQueue.isEmpty()))	//可以看做rs=SHUTDOWN,并且workQueue.isEmpty()队列为空
                return false;
    				//循环CAS增加线程池中线程的个数
            for (;;) {
              	//获取线程池中线程个数
                int wc = workerCountOf(c);
              	//如果线程池线程数量超过最大线程池数量,则直接返回
                if (wc >= CAPACITY ||
                    //如果指定使用corePoolSize作为限制则使用corePoolSize,反之使用maximumPoolSize,最为工作线程最大线程线程数量,如果工作线程大于相应的线程数量则直接返回。
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
              	//CAS增加线程池中线程的数量
                if (compareAndIncrementWorkerCount(c))
                  	//跳出增加线程池数量。
                    break retry;
              	//如果修改失败,则重新获取线程池的状态和线程数量
                c = ctl.get();  // Re-read ctl
              	//如果最新的线程池状态和原有县城出状态不一样时,则跳转到外层retry中,否则在内层循环重新进行CAS
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    		
      	//工作线程是否开始启动标志
        boolean workerStarted = false;
      	//工作线程添加到线程池成功与否标志
        boolean workerAdded = false;
        Worker w = null;
        try {
          	//创建一个Worker对象
            w = new Worker(firstTask);
          	//获取worker中的线程,这里线程是通过ThreadFactory线程工厂创建出来的,详细看下面源码信息。
            final Thread t = w.thread;
          	//判断线程是否为空
            if (t != null) {
              	//添加独占锁,为添加worker进行同步操作,防止其他线程同时进行execute方法。
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //获取线程池的状态
                    int rs = runStateOf(ctl.get());
    								//如果线程池状态为RUNNING或者是线程池状态为SHUTDOWN并且第一个任务为空时,当线程池状态为SHUTDOWN时,是不允许添加新任务的,所以他会从队列中获取任务。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      	//添加worker到集合中
                        workers.add(w);
                        int s = workers.size();
                      	//跟踪最大的线程池数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      	//添加worker成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
              	//如果添加worker成功就启动任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
          	//如果没有启动,w不为空就已出worker,并且线程池数量进行减少。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    通过上面addWorker方法可以分为两个部分来进行讲解,第一部分是对线程池中线程数量的通过CAS的方式进行增加,其中第一部分中上面有个if语句,这个地方着重分析下:

    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;
    

    可以看成下面的样子,将!放到括号里面,变成下面的样子:

    if (rs >= SHUTDOWN &&
         (rs != SHUTDOWN ||
           firstTask != null ||
           workQueue.isEmpty()))
        return false;
    
    • 线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED

    • 当线程池状态是STOP、TIDYING、TERMINATED时,这些状态的时候不需要进行线程的添加和启动操作,因- 为如果是上面的状态,其实线程池的线程正在进行销毁操作,意味着线程调用了shutdownNow等方

    • 如果线程池状态为SHUTDOWN并且第一个任务不为空时,不接受新的任务,直接返回false,也就是说SHUTDOWN的状态,不会接受新任务,只会针对队列中未完成的任务进行操作。

    • 当线线程池状态为SHUTDOWN并且队列为空时,直接返回不进行任务添加。

    上半部分分为内外两个循环,外循环对线程池状态的判断,用于判断是否需要添加工作任务线程,通过上面讲的内容进行判断,后面内循环则是通过CAS操作增加线程数,如果指定了core参数为true,代表线程池中线程的数量没有超过corePoolSize,当指定为false时,代表线程池中线程数量达到了corePoolSize,并且队列已经满了,或者是SynchronousQueue这种无空间的队列,但是还没有达到最大的线程池maximumPoolSize,所以它内部会根据指定的core参数来判断是否已经超过了最大的限制,如果超过了就不能进行添加线程了,并且进行拒绝策略,如果没有超过就增加线程数量。

    第二部分主要是把任务添加到worker中,并启动线程,这里我们先来看一下Worker对象。

    // 这里发现它是实现了AQS,是一个不可重入的独占锁模式
    // 并且它还集成了Runable接口,实现了run方法。
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** 执行任务的线程,通过ThreadFactory创建 */
        final Thread thread;
        /** 初始化第一个任务*/
        Runnable firstTask;
        /** 每个线程完成任务的数量 */
        volatile long completedTasks;
    
        /**
         * 首先现将state值设置为-1,因为在AQS中state=0代表的是锁没有被占用,而且在线程池中shutdown方法会判断能否争抢到锁,如果可以获得锁则对线程进行中断操作,如果调用了shutdownNow它会判断state>=0会被中断。
         * firstTask第一个任务,如果为空则会从队列中获取任务,后面runWorker中。
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** 委托调用外部的runWorker方法 */
        public void run() {
            runWorker(this);
        }
    
    		//是否独占锁
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    		
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    		//这里就是上面shutdownNow中调用的线程中断的方法,getState()>=0
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    可以看到Worker是一个实现了AQS的锁,它是一个不可重入的独占锁,并且他也实现了Runnable接口,实现了run方法,在构造函数中将AQS的state设置为-1,为了避免线程还没有进入runWorker方法前,就调用了shutdown或shutdownNow方法,会被中断,设置为-1则不会被中断。后面我们看到run方法,它调用的是ThreadPoolExecutor的runWorker方法,我们这里回想一下,在addWorker方法中,添加worker到HashSet中后,他会将workerAdded设置为true,代表添加worker成功,后面有调用了下面代码:

    if (workerAdded) {
        t.start();
        workerStarted = true;
    }
    

    这个t代表的就是在Worker构造函数中的使用ThreadFactory创建的线程,并且将自己(Worker自己)传递了当前线程,创建的线程就是任务线程,任务线程启动的时候会调用Worker下的run方法,run方法内部又委托给外部方法runWorker来进行操作,它的参数传递的是调用者自己,Worker中的run方法如下所示:

    public void run() {
        runWorker(this); 			//this指Worker对象本身
    }
    

    这里简单画一张图来表示下调用的逻辑。
    在这里插入图片描述
    整体的逻辑是先进行创建线程,线程将Worker设置为执行程序,并将线程塞到Worker中,然后再addWorker中将Worker中的线程取出来,进行启动操作,启动后他会调用Worker中的run方法,然后run方法中将调用ThreadPoolExecutor的runWorker,然后runWorker又会调用Worker中的任务firstTask,这个fistTask是要真正执行的任务,也是用户自己实现的代码逻辑。

    接下来我们就要看一下runWorker方法里面具体内容:

    final void runWorker(Worker w) {
      	//调用者也就是Worker中的线程
        Thread wt = Thread.currentThread();
      	//获取Worker中的第一个任务
        Runnable task = w.firstTask;
      	//将Worker中的任务清除代表执行了第一个任务了,后面如果再有任务就从队列中获取。
        w.firstTask = null;
      	//这里还记的我们在new Worker的时候将AQS的state状态设置为-1,这里先进行解锁操作,将state设置为0
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          	//循环进行获取任务,如果第一个任务不为空,或者是如果第一个任务为空,从任务队列中获取任务,如果有任务则返回获取的任务信息,如果没有任务可以获取则进行阻塞,阻塞也分两种第一种是阻塞直到任务队列中有内容,第二种是阻塞队列一定时间之后还是没有任务就直接返回null。
            while (task != null || (task = getTask()) != null) {
              	//先获取worker的独占锁,防止其他线程调用了shutdown方法。
                w.lock();
                // 如果线程池正在停止,确保线程是被中断的,如果没有则确保线程不被中断操作。
                if ((runStateAtLeast(ctl.get(), STOP) || //如果线程池状态为STOP、TIDYING、TERMINATED直接拒绝任务中断当前线程
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  	//执行任务之前做一些操作,可进行自定义
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      	//运行任务在这里喽。
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
    	                  //执行任务之后做一些操作,可进行自定义
                        afterExecute(task, thrown);
                    }
                } finally {
                  	//将任务清空为了下次任务获取
                    task = null;
                  	//统计当前Worker完成了多少任务
                    w.completedTasks++;
                  	//独占锁释放
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
          	//处理Worker的退出操作,执行清理工作。
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    我们看到如果Worker是第一次被启动,它会从Worker中获取firstTask任务来执行,然后执行成功后,它会getTask()来从队列中获取任务,这个地方比较有意思,它是分情况进行获取任务的,我们都直到BlockingQueue中提供了几种从队列中获取的方法,这个getTask中使用了两种方式,第一种是使用poll进行获取队列中的信息,它采用的是过一点时间如果队列中仍没有任务时直接返回null,然后还有一个就是take方法,take方法是如果队列中没有任务则将当前线程进行阻塞,等待队列中有任务后,会通知等待的队列线程进行消费任务,让我们看一下getTask方法:

    private Runnable getTask() {
        boolean timedOut = false; //poll获取超时
    
        for (;;) {
          	//获取线程池的状态和线程数量
            int c = ctl.get();
          	//获取线程池的状态
            int rs = runStateOf(c);
    
            //线程池状态大于等于SHUTDOWN
          	//1.线程池如果是大于STOP的话减少工作线程池数量
          	//2.如果线程池状态为SHUTDOW并且队列为空时,代表队列任务已经执行完,返回null,线程数量减少1
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    				//获取线程池数量。
            int wc = workerCountOf(c);
    
            //如果allowCoreThreadTimeOut为true,则空闲线程在一定时间未获得任务会清除
          	//或者如果线程数量大于corePoolSize的时候会进行清除空闲线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    				//1.如果线程池数量大于最大的线程池数量或者对(空余线程进行清除操作并且poll超时了,意思是队列中没有内容了,导致poll间隔一段时间后没有获取内容超时了。
          	//2.如果线程池的数量大于1或者是队列已经是空的
          	//总之意思就是当线程池的线程池数量大于corePoolSize,或指定了allowCoreThreadTimeOut为true,当队列中没有数据或者线程池数量大于1的情况下,尝试对线程池的数量进行减少操作,然后返回null,用于上一个方法进行清除操作。
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
              	//如果timed代表的是清除空闲线程的意思
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :	//等待一段时间如果没有获取到返回null。
                    workQueue.take();					//阻塞当前线程
              	//如果队列中获取到内容则返回
                if (r != null)
                    return r;
    						//如果没有获取到超时了则设置timeOut状态
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    
    1. 工作线程调用getTask从队列中进行获取任务。
    2. 如果指定了allowCoreThreadTimeOut或线程池线程数量大于corePoolSize则进行清除空闲多余的线程,调用阻塞队列的poll方法,在指定时间内如果没有获取到任务直接返回false。
    3. 如果线程池中线程池数量小于corePoolSize或者allowCoreThreadTimeOut为false默认值,则进行阻塞线程从队列中获取任务,直到队列有任务唤醒线程。

    我们还记得第一张图中有标记出来是core线程和普通线程,其实这样标记不是很准确,准确的意思是如果线程池的数量超过了corePoolSize并且没有特别指定allowCoreThreadTimeOut的情况下,它会清除掉大于corePoolSize并且小于等于maximumPoolSize的一些线程,标记出core线程的意思是有corePoolSize不会被清除,但是会清除大于corePoolSize的线程,也就是线程池中的线程对获取任务的时候进行判断,也就是getTask中进行判断,如果当前线程池的线程数量大于corePoolSize就使用poll方式获取队列中的任务,当过一段时间还没有任务就会返回null,返回null之后设置timeOut=true,并且获取getTask也会返回null,到此会跳到调用者runWorker方法中,一直在while (task != null || (task = getTask()) != null)此时的getTask返回null跳出while循环语句,设置completedAbruptly = false,表示不是突然完成的而是正常完成,退出后它会执行finally的processWorkerExit(w, completedAbruptly),执行清理工作。我们来看下源码:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) 				// 如果突然完成则调整线程数量
            decrementWorkerCount();		// 减少线程数量1
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();														//获取锁,同时只有一个线程获得锁
        try {
            completedTaskCount += w.completedTasks;	//统计整个线程池完成的数量
            workers.remove(w);											//将完成任务的worker从HashSet中移除
        } finally {
            mainLock.unlock();											//释放锁
        }
    		//尝试设置线程池状态为TERMINATED
      	//1.如果线程池状态为SHUTDOWN并且线程池线程数量与工作队列为空时,修改状态。
      	//2.如果线程池状态为STOP并且线程池线程数量为空时,修改状态。
        tryTerminate();								
    		
      	// 获取线程池的状态和线程池的数量
        int c = ctl.get();
      	// 如果线程池的状态小于STOP,也就是SHUTDOWN或RUNNING状态
        if (runStateLessThan(c, STOP)) {
          	//如果不是突然完成,也就是正常结束
            if (!completedAbruptly) {
              	//如果指定allowCoreThreadTimeOut=true(默认false)则代表线程池中有空余线程时需要进行清理操作,否则线程池中的线程应该保持corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              	//这里判断如果线程池中队列为空并且线程数量最小为0时,将最小值调整为1,因为队列中还有任务没有完成需要增加队列,所以这里增加了一个线程。
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
          	//如果当前线程数效益核心个数,就增加一个Worker
            addWorker(null, false);
        }
    

    通过上面的源码可以得出,如果线程数超过核心线程数后,在runWorker中就不会等待队列中的消息,而是会进行清除操作,上面的清除代码首先是先对线程池的数量进行较少操作,其次是统计整个线程池中完成任务的数量,然后就是尝试修改线程池的状态由SHUTDOWN->TIDYING->TERMINATED或者是由STOP->TIDYING->TERMINATED,修改线程池状态为TERMINATED,需要有两个条件:

    1. 当线程池线程数量和工作队列为空,并且线程池的状态为SHUTDOWN时,才会将状态进行修改,修改的过程是SHUTDOWN->TIDYING->TERMINATED

    2. 当线程池的状态为STOP并且线程池数量为空时,才会尝试修改状态,修改过程是STOP->TIDYING->TERMINATED

    如果设置为TERMINATED状态,还需要调用条件变量termination的signalAll()方法来唤醒所有因为调用awaitTermination方法而被阻塞的线程,换句话说当调用awaitTermination后,只有线程池状态变成TERMINATED才会被唤醒。

    接下来我们就来分析一下这个tryTerminate方法,看一下他到底符不符合我们上述说的内容:

    final void tryTerminate() {
        for (;;) {
          	// 获取线程池的状态和线程池的数量组合状态
            int c = ctl.get();
          	//这里单独下面进行分析,这里说明两个问题,需要反向来想这个问题。
          	//1.如果线程池状态STOP则不进入if语句
          	//2.如果线程池状态为SHUTDOWN并且工作队列为空时,不进入if语句
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
          	//如果线程池数量不为空时,进行中断操作。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              	//修改状态为TIDYING,并且将线程池的数量进行清空
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                      	//执行一些逻辑,默认是空的
                        terminated();
                    } finally {
                      	//修改状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                      	//唤醒调用awaitTermination方法的线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    

    我们单独将上面的if语句摘出来进行分析,将上面的第一个if判断进行修改如下,可以看到return在else里面,这时候内部if判断进行转换,转换成如下所示:

    if (!isRunning(c) &&			
        !runStateAtLeast(c, TIDYING) && //只能是SHUTDOWN和STOP
        (runStateOf(c) != SHUTDOWN ||  workQueue.isEmpty())){
        //这里执行逻辑
    }else {
    		return;
    }
    

    逐一分析分析内容如下:

    • !isRunning©代表不是RUNNING,则可能的是SHUTDOWN,STOP,TIDYING,TERMINATED这四种状态

    • 中间的连接符是并且的意思,跟着runStateAtLeast(c, TIDYING)这句话的意思是至少是TIDYING,TERMINATED这两个,反过来就是可能是RUNNING,SHUTDOWN,STOP,但是前面已经判断了不能是RUNINNG状态,所以前面两个连在一起就是只能是状态为SHUTDOWN,STOP

    • runStateOf© != SHUTDOWN || workQueue.isEmpty()当前面的状态是SHUTDOWN时,则会出发workQueue.isEmpty(),连在一起就是状态是SHUTDOWN并工作队列为空,当线程池状态为STOP时,则会进入到runStateOf© != SHUTDOWN,直接返回true,就代表线程池状态为STOP

    后面还有一个语句一个if语句将其转换一下逻辑就是下面的内容:

    if (workerCountOf(c) == 0) { 
     		//执行下面的逻辑   
    }else{
      	interruptIdleWorkers(ONLY_ONE);
        return;
    }
    

    这里我们也进行转换下,就可以看出来当线程池的数量为空时,才会进行下面的逻辑,下面的逻辑就是修改线程池状态为TERMINATED,两个连在一起就是上面分析的修改状态为TERMINATED的条件,这里画一张图来表示线程池状态的信息:
    在这里插入图片描述
    其实上面图中我们介绍了关于从SHUTDOWN或STOP到TERMINATED的变化,没有讲解关于如何从RUNNING状态转变成SHUTDOWN或STOP状态,其实是调用了shutdown()或shutdownNow方法对其进行状态的变换,下面来看一下shutdown方法源码:

    public void shutdown() {
      	//获取全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	//权限检查
            checkShutdownAccess();
          	//设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
            advanceRunState(SHUTDOWN);
          	//如果线程没有设置中断标识并且线程没有运行则设置中断标识
            interruptIdleWorkers();
          	//空的可以实现的内容
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
      	//尝试修改线程池状态为TERMINATED
        tryTerminate();
    }
    
    1. 首先对当前线程进行权限检测,查看是否设置了安全管理器,如果设置了则要看当前调用shutdown的线程有没有权限都关闭线程的权限,如果有权限还要看是否有中断工作现成的权限,如果没有权限则抛出SecurityException或NullPointException异常。

    2. 设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回

    3. 如果线程没有设置中断标识并且线程没有运行则设置中断标识

    4. 尝试修改线程池状态为TERMINATED

    接下来我们来看一下advanceRunState内容如下所示:

    private void advanceRunState(int targetState) {
        for (;;) {
          	//获取线程池状态和线程池的线程数量
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||		//如果线程池的状态>=SHUTDOWN
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))	//设置线程池状态为SHUTDOWN
              	//返回
                break;										
        }
    }
    
    1. 当线程池的状态>=SHUTDOWN,直接返回
    2. 如果线程池状态为RUNNING,设置线程池状态为SHUTDOWN,设置成功则返回

    interruptIdleWorkers代码如下所示:

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    private void interruptIdleWorkers(boolean onlyOne) {
      	//获取全局锁,同时只能有一个线程能够调用shutdown方法
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	//遍历工作线程
            for (Worker w : workers) {
                Thread t = w.thread;
              	//如果当前线程没有设置中断标志并且可以获取Worker自己的锁
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                      	//设置中断标志
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
              	//执行一次,清理空闲线程。
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    我们看到当我们调用shutdown方法的时候,只是将空闲的线程给设置了中断标识,也就是活跃正在执行任务的线程并没有设置中断标识,直到将任务全部执行完后才会逐步清理线程操作,我们还记的在getTask中的方法里面有这样一段代码:

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }
    

    判断是否是状态>=SHUTDOWN,并且队列为空时,将线程池数量进行减少操作,内部进行CAS操作,直到CAS操作成功为止,并且返回null,返回null后,会调用processWorkerExit(w, false);清理Workers线程信息,并且尝试将线程设置为TERMINATED状态,上面是对所有shutdown方法的分析,下面来看一下shutdownNow方法并且比较两个之间的区别:

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	//权限检查
            checkShutdownAccess();
          	//设置线程池状态为STOP,如果状态已经是大于等于STOP则直接返回
            advanceRunState(STOP);
          	//这里是和SHUTDOWN区别的地方,这里是强制进行中断操作
            interruptWorkers();
          	//将为完成任务复制到list集合中
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
      	//尝试修改线程池状态为TERMINATED
        tryTerminate();
        return tasks;
    }
    

    shutdownNow方法返回了未完成的任务信息列表tasks = drainQueue();,其实该方法和shutdown方法主要的区别在于一下几点内容:

    1. shutdownNow方法将线程池状态设置为STOP,而shutdown则将状态修改为SHUTDOWN
    2. shutdownNow方法将工作任务进行中断操作,也就是说如果工作线程在工作也会被中断,而shutdown则是先尝试获取锁如果获得锁成功则进行中断标志设置,也就是中断操作,如果没有获取到锁则等待进行完成后自动退出。
    3. shutdownNow方法返回未完成的任务列表。

    下面代码是shutDownNow的interruptWorkers方法:

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
              	//直接进行中断操作。
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    

    内部调用了Worker的interruptIfStarted方法,方法内部是针对线程进行中断操作,但是中断的前提条件是AQS的state状态必须大于等于0,如果状态为-1的则不会被中断,但是如果任务运行起来的时候在runWorker中则不会执行任务,因为线程池状态为STOP,如果线程池状态为STOP则会中断线程,下面代码是Worker中的interruptIfStarted:

    void interruptIfStarted() {
        Thread t;
      	//当前Worker锁状态大于等于0并且线程没有被中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
    

    拒绝策略

    JDK内置的拒绝策略如下:

    1. AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作
    2. CallerRunsPolicy策略:只要线程池没有关闭线程池状态是RUNNING状态,该略略直接调用线程中运行当前被丢弃的任务
    3. DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务
    4. DiscardPolicy策略:该策略默默丢弃无法处理的任务,不予任何处理。

    在这里插入图片描述

    总结

    首先先上一张图,针对这张图来进行总结:

    在这里插入图片描述

    1. 主线程进行线程池的调用,线程池执行execute方法
    2. 线程池通过addWorker进行创建线程,并将线程放入到线程池中,这里我们看到第二步是将线程添加到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据corePoolSize和maximumPoolSize设置的大小来进行区分,因为超过corePoolSize的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断,当前线程池数量大于corePoolSize,或者指定了allowCoreThreadTimeOut为true,则他等待一定时间后会返回,不会一直等待
    3. 当线程池的数量达到corePoolSize时,线程池首先会将任务添加到队列中
    4. 当队列中任务也达到了队列设置的最大值时,它会创建新的线程,注意的是此时的线程数量已经超过了corePoolSize,但是没有达到maximumPoolSize最大值。
    5. 当线程池的线程数量达到了maximumPoolSize,则会相应拒绝策略。
    展开全文
  • 线程池原理 简单来说就是把宝贵的资源管理起来,每次用的时候再去取,用完放回,让其他人也可以复用。 那在Java中我们该怎么实现呢? 在Java中线程池的核心类是ThreadPoolExecutor,并在此类的基础上封装了几种常用...

    程序员成长路上有着不同的阶段,只要你翻过了当时那个阶段,那么你将会有了不一样的收获。很多时候,我们在刚开始面对它们的时候,还看不清,看不透,云里雾里,让人觉得它们很高深。等我们正在的了解它们了之后就觉一切都是那么简单、自然。

     

    再努力一下,一切将会不一样!—— 鲁迅

    前言

    多线程开发就是这样的一座山,需要我们去克服。说到多线程,大部分新手(作者自己),在面试中谈到多线程就慌了,因为自己在实际工作中真的很少碰到,而且我们多数时候都是在做传统的单体项目开发,说真的,很少会碰到用到多线程的,用都没用过面试的时候让我们怎么说。为了让大家对多线程有个大致了解,现在让作者我跟大家瞎扯几句,作者很少写文章,写得不太通顺的地方,大家多(wang)多(si)谅(li)解(pen) 。

     

    既然要讲多线程,就不得不说下线程、线程池了~

     

     

    线程

    在Java中你会怎么创建一个线程吗?

    很简单啊,比如说:

    Thread t = new Thread(new Runnable() {
               @Override
               public void run() {
                  // 处理逻辑代码
                  process();
              }
          });
    t.start();

    恩,没错,这样确实可以简单的创建出一个线程对象。

    我们知道一个线程的创建于销毁是需要消耗资源的,大家来思考一个问题:假设我们项目中要经常用到多个线程去处理业务的话,每次都是用完就销毁,这也未免太浪费了些吧,毕竟线程只是帮我们执行相应的任务,完全可以继续复用它呀,让它接着处理其他任务。那么在Java中有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

    如果之前大家有看过《阿里巴巴 Java 手册》的话,那么应该知道有那么一条:

    可见线程池就是我们的答案!

    线程池的作用

    简单来说使用线程池有以下几个目的:

    • 线程是稀缺资源,不能频繁的创建。

    • 解耦作用,线程的创建于执行完全分开,方便维护。

    • 应当将其放入一个池子中,可以给其他任务进行复用。

    线程池原理

    简单来说就是把宝贵的资源管理起来,每次用的时候再去取,用完放回,让其他人也可以复用。

    那在Java中我们该怎么实现呢?

    在Java中线程池的核心类是ThreadPoolExecutor,并在此类的基础上封装了几种常用线程池:

    • Executors.newCachedThreadPool():无限线程池。

    • Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。

    • Executors.newSingleThreadExecutor():创建单个线程的线程池。

    我们看下他们是怎么实现的:

    // 无限线程池
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>(),
                                     threadFactory);
    }
    // 创建固定大小的线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
           return new ThreadPoolExecutor(nThreads, nThreads,
                                         0L, TimeUnit.MILLISECONDS,
                                         new LinkedBlockingQueue<Runnable>());
    }
    // 创建单个线程的线程池
    public static ExecutorService newSingleThreadExecutor() {
           return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                       0L, TimeUnit.MILLISECONDS,
                                       new LinkedBlockingQueue<Runnable>()));
    }

    看上面源代码,我们知道它们都是基于ThreadPoolExecutor 实现的,那我们就来看看它们是给ThreadPoolExecutor传了什么参数才导致它们可以实现不同功能的线程池的呢?

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

    由上面的参数解释,我们可以知道,线程池是通过设置corePoolSize(最小线程数)和maximumPoolSize(最大线程数)来确定线程池的大小范围,让线程池在我们定义范围内扩容、减容。线程池的扩容是要判断当前所需的线程数是否超过核心线程数阻塞队列也满了并且当前线程数小于最大线程数,都符合了,才会扩容。我们可以看下面的流程图来理解:

    • corePoolSize 核心线程数,为线程池的基本大小。

    • maximumPoolSize 为线程池最大线程大小。

    • keepAliveTimeunit 则是线程空闲后的存活时间。

    • workQueue 用于存放任务的阻塞队列。

    • threadFactory 线程工厂,主要用来创建线程。

    • handler 当队列和最大线程池都满了之后的饱和策略。

      • ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常

      • ThreadPoolExecutor.DiscardPolicy 也是丢弃任务,但是不抛出异常

      • ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

      • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

     

     

    虽然,Executors给我们封装好了上面几个构建线程池的方法,但是,并不建议直接使用

    为什么呢?

    让我们来看看阿里巴巴的开发手册:

    让我们看下源码是不是真的这样:

    • Executors 源码

    // Executors.class
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
                    (new ThreadPoolExecutor(1, 1,
                          0L, TimeUnit.MILLISECONDS,
                          new LinkedBlockingQueue<Runnable>()));// 这里采用了无参构造方法
    }
    • LinkedBlockingQueue源码

    // LinkedBlockingQueue.class
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);  // 无参构造方法,直接设置为Integer.MAX_VALUE大小
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    果然如此~

    所以还是按照我们自己业务上需求自定义配置属于自己的线程池吧!

    好了,暂时先讲到这啦~

    大家觉得不错帮忙点个在看哈~

     

    参考:

    https://blog.csdn.net/pange1991/article/details/53860651

    《阿里巴巴Java开发手册》

     

    展开全文
  • Java线程池原理及使用

    万次阅读 多人点赞 2018-09-27 17:10:08
    java中的线程池是运用场景最多...1.1、线程池ThreadPoolExecutor工作原理 讲解之前,我们先看一张原理图 ThreadPoolExecutor执行execute方法有4种情况: 1)如果当前运行的线程少于corePoolSize,则创建新的线程来...

    java中的线程池是运用场景最多的并发框架。在开发过程中,合理的使用线程池能够带来下面的一些好处:
    1、降低资源的消耗。
    2、提高响应速度。
    3、提高线程的可管理型。

    1.1、线程池ThreadPoolExecutor工作原理

    讲解之前,我们先看一张原理图
    20160221172500424.png

    ThreadPoolExecutor执行execute方法有4种情况:
    1)如果当前运行的线程少于corePoolSize,则创建新的线程来执行任务。
    2)如果运行的线程等于或者多余corePoolSize,则将任务加入BlockingQueue中,在等待队列中,等待有新的线程可以运行。
    3)如果BlockingQueue队列满了,且没有超过maxPoolSize,则创建新的线程来处理任务。
    4)如果创建的线程超过maxPoolSize,任务会拒绝,并调用RejectExecutionHandler.rejectedExecution()方法。

    1.2、线程池的使用

    1.2.1、线程池的创建

    一般我们可以通过ThreadPoolExecutor来创建一个线程池。
    在ThreadPoolExecutor类中提供了四个构造方法:

    public class ThreadPoolExecutor extends AbstractExecutorService {
        .....
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
        ...
    }
    

    我们通过

    new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
    

    创建一个新的线程池。

    下面我们介绍一下需要输入的几个参数的意义:

    1)corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    2) maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    3)keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒</pre>
    

    4) workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    • ArrayBlockingQueue:一个基于数组结构的有界阻塞队列。
    • LinkedBlockingQueue:一个基于链表的阻塞队列,吞吐量要高于ArrayBlockingQueue。
    • SynchronousQueue:一个不存储元素的阻塞队列。每次插入操作必须等到另外一个线程调用移除操作,否则一直处于阻塞状态。吞吐量要高于LinkedBlockingQueue。
    • PriorityBlockingQueue:一个具有优先级的无线阻塞队列。

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

    5)threadFactory:线程工厂,主要用来创建线程;
    6)RejectedExecutionHandler:当队列和线程池都满了,将会执行下面的策略,jdk1.5中提供有以下四种策略:

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    1.2.2、如何向线程池提交任务

    向线程池提交任务,提供了两种方法,分别是execute()和submit()方法。

    1)execute()方法

    execute方法用于提交不需要返回值的任务,所以也就意味着无法判断是否执行成功。

    pool.execute(new Runnable(){
    
    			@Override
    			public void run() {
    				System.out.println("使用execute提交任务.");
    			}
    			
    });
    

    2)submit方法

    submit方法可以用于提交需要有返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判读是否执行成功,并且还可以通过get()方法来获取返回值。

    Runnable task = null;
    		Future<Object> future = (Future<Object>) pool.submit(task);
            try {
    			future.get();//获取返回值
    		} catch (InterruptedException e) {//中断异常处理
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} catch (ExecutionException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} finally {
    			//关闭线程池
    			pool.shutdown();
    		}
    

    1.2.3、关闭线程池

    在上一节中,我们在异常的处理后面,我们就使用到了shutdown()方法来关闭线程池。

    在关闭线程池的时候,这里有两个方法可以调用,分别是shutdownshutdownNow方法。

    1.3、线程池使用实例

    1.3.1、线程池的使用实例

    这个实例我们使用自定义的拒绝策略,因为jdk的策略并不是很完美

    public class MyRejected implements RejectedExecutionHandler{
    
    	
    	public MyRejected(){
    	}
    	
    	@Override
    	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    		System.out.println("自定义处理..");
    		System.out.println("当前被拒绝任务为:" + r.toString());
    		
    
    	}
    
    }
    

    然后我们定义一个任务类

    public class MyTask implements Runnable {
    
    	private int taskId;
    	private String taskName;
    	
    	public MyTask(int taskId, String taskName){
    		this.taskId = taskId;
    		this.taskName = taskName;
    	}
    	
    	public int getTaskId() {
    		return taskId;
    	}
    
    	public void setTaskId(int taskId) {
    		this.taskId = taskId;
    	}
    
    	public String getTaskName() {
    		return taskName;
    	}
    
    	public void setTaskName(String taskName) {
    		this.taskName = taskName;
    	}
    
    	@Override
    	public void run() {
    		try {
    			System.out.println("run taskId =" + this.taskId);
    			Thread.sleep(5*1000);
    			//System.out.println("end taskId =" + this.taskId);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}		
    	}
    	
    	public String toString(){
    		return Integer.toString(this.taskId);
    	}
    
    }
    
    

    最后,我们看一下任务执行

    public class UseThreadPoolExecutor1 {
    
    
    	public static void main(String[] args) {
    		/**
    		 * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
    		 * 若大于corePoolSize,则会将任务加入队列,
    		 * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
    		 * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
    		 * 
    		 */	
    		ThreadPoolExecutor pool = new ThreadPoolExecutor(
    				1, 				//coreSize
    				2, 				//MaxSize
    				60, 			//60
    				TimeUnit.SECONDS, 
    				new ArrayBlockingQueue<Runnable>(3)			//指定一种队列 (有界队列)
    				//new LinkedBlockingQueue<Runnable>()
    				, new MyRejected()
    				//, new DiscardOldestPolicy()
    				);
    		
    		MyTask mt1 = new MyTask(1, "任务1");
    		MyTask mt2 = new MyTask(2, "任务2");
    		MyTask mt3 = new MyTask(3, "任务3");
    		MyTask mt4 = new MyTask(4, "任务4");
    		MyTask mt5 = new MyTask(5, "任务5");
    		MyTask mt6 = new MyTask(6, "任务6");
    		
    		pool.execute(mt1);
    		pool.execute(mt2);
    		pool.execute(mt3);
    		
    		
    		
    		/*pool.execute(mt4);
    		pool.execute(mt5);
    		pool.execute(mt6);*/
    		
    		pool.shutdown();
    //		pool.shutdownNow();
    		
    	}
    }
    

    执行结果:
    1)当运行<5个时,可以正常运行:
    搜狗截图20180927165203.jpg
    2)当>5时,因为大于了最大值,所以执行了异常策略:
    搜狗截图20180927165223.jpg

    1.3.2、线程池的监控参数或者其他api使用

    当我们需要对线程池进行监控时,我们可以使用线程池提供的参数进行监控,可以使用下面的一些属性。

    • taskCount:线程池需要执行的任务数量。
    • completedTaskCount:线程池在运行过程中已完成的数量。
    • largestPoolSize:线程池里曾经创建过的最大的线程数量。
    • poolSize:线程池的线程数量。
    • ActiveCount:获取活动的线程数量。
    System.out.println(pool.getTaskCount());
    System.out.println(pool.getCompletedTaskCount());
    System.out.println(pool.getLargestPoolSize());
    System.out.println(pool.getPoolSize());
    System.out.println(pool.getActiveCount());
    

    运行结果:
    搜狗截图20180927170209.jpg

    1.4、如何合理的配置线程池的大小

    一般需要根据任务的类型来配置线程池大小:

    • 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

    • 如果是IO密集型任务,参考值可以设置为2*NCPU

    • 建议使用有界队列。因为有界队列能够增加系统的稳定性和预警的能力,我们可以想象一下,当我们使用无界队列的时候,当我们系统的后台的线程池的队列和线程池会越来越多,这样当达到一定的程度的时候,有可能会撑满内存,导致系统出现问题。当我们是有界队列的时候,当我们系统的后台的线程池的队列和线程池满了之后,会不断的抛出异常的任务,我们可以通过异常信息做一些事情。

    当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

    展开全文
  • 1. 线程池原理 在传统服务器结构中,常用一个总的监听线程监听新用户连接,当有一个新用户进入时,服务器就开启一个新的线程,用于处理这个用户的数据收发,这个线程只服务于这个用户,当用户与服务器端连接关闭...
  • 先说一下线程池底层的数据结构 队列:一种线性表,它的特性是先进先出,插入在一端,删除在另一端。 队列又分为阻塞队列BlockingQueue和非阻塞队列ConcurrentLinkedQueue   生产者生产元素插入队列,消费者消费...
  • Android 线程池原理及使用

    千次阅读 2019-06-21 10:53:09
    目录1 线程池简介2 ThreadPoolExecutor3 线程池的使用3.1 线程池使用步骤3.2 使用线程池的原因3.3 线程池管理线程的优点3.4 newFixedThreadPool3.5 newSingleThreadExecutor3.6 newCachedThreadPool3.7 ...
  • 线程池原理,优缺点,组成模块,各模块之间的逻辑关系分析
  • 线程池原理(讲的非常棒)

    万次阅读 多人点赞 2018-07-30 21:29:36
    Java并发编程:线程池的使用  在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:  如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,...
  • 主要介绍了Java常用线程池原理及使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • Tomcat线程池原理

    2019-08-18 00:00:00
    Tomcat线程池扩充了java的ThreadPoolExecutor tomcat定制了自己的Queue(重写了LinkedBlockingQueue)和ThreadFactory 最大线程数(maximumPoolSize)线程池大小有控制,可以通过参数设置 处理流程: 前...
  • 线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值 线程池的关闭 调用shutdown或shutdownNow 它们的原理是遍历线程池中的工作线程,然后...
  • 主要介绍了Java 线程池原理深入分析的相关资料,需要的朋友可以参考下
  • c++线程池原理和应用

    2020-04-08 01:35:00
    线程池2.线程池作用及应用场合3. 线程池实例 1.线程池 线程池是在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当...
  • 主要介绍了MySQL的线程池原理学习教程,包括线程池的函数调用与关键接口等重要知识,非常推荐!需要的朋友可以参考下
  • 线程池原理(面试)

    2019-10-16 16:19:03
    1. 什么是线程池? 顾名思义,存放线程的一个池子。 2.怎么设计一个线程池 (1) 新建一个数组,创建一堆线程存放进去; (2) 线程池中的线程来处理任务,处理完成后回收线程而不是销毁线程; (3) 设计等待...
  • 首先介绍如何使用,后面再介绍原理: 第一种: Java1.5以后自带的线程池 public class App { public static void main(String[] args) throws Exception {   ExecutorService executorService = new ...
  • python线程池及其原理和使用 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用...
  • 只有掌握了线程池的实现原理,才能更好的运用它来优雅的并发编程。 本 Chat 主要带领大家从 JDK 源码的角度一起学习 ScheduledThreadPoolExecutor 的实现原理。当然与之相辅相成的 Future 设计模式也会进行细致的...
  • 线程池原理以及适用场景
  • 1.什么是线程池 Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。 第一:降低资源消耗。 通过重复利用已创建的...
  • 主要介绍了JAVA线程池原理,结合实例形式详细分析了java线程池概念、原理、创建、使用方法及相关注意事项,需要的朋友可以参考下

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 122,952
精华内容 49,180
关键字:

线程池原理