精华内容
下载资源
问答
  • 文章目录如何设计一个线程池?三个步骤线程池是什么?为什么要用线程池?需要考虑的点线程池状态状态有哪些?如何维护状态?线程相关线程怎么封装?线程放在哪个池子里?线程怎么取得任务?线程有哪些状态?线程的...

    以前,我总觉得的买一件东西,做一件事,或者从某一个时间节点开始,我的生命就会发生转折,一切就会无比顺利,立马变厉害。但是,事实上并不是如此。我不可能马上变厉害,也不可能一口吃成一个胖子。看一篇文章也不能让你从此走上人生巅峰,越来越相信,这是一个长期的过程,只有量变引起质变,纵使缓慢,驰而不息。

    如何设计一个线程池?

    三个步骤

    这是一个常见的问题,如果在比较熟悉线程池运作原理的情况下,这个问题并不难。设计实现一个东西,三步走:是什么?为什么?怎么做?

    线程池是什么?

    线程池使用了池化技术,将线程存储起来放在一个 “池子”(容器)里面,来了任务可以用已有的空闲的线程进行处理, 处理完成之后,归还到容器,可以复用。如果线程不够,还可以根据规则动态增加,线程多余的时候,亦可以让多余的线程死亡。

    为什么要用线程池?

    实现线程池有什么好处呢?

    • 降低资源消耗:池化技术可以重复利用已经创建的线程,降低线程创建和销毁的损耗。
    • 提高响应速度:利用已经存在的线程进行处理,少去了创建线程的时间
    • 管理线程可控:线程是稀缺资源,不能无限创建,线程池可以做到统一分配和监控
    • 拓展其他功能:比如定时线程池,可以定时执行任务

    需要考虑的点

    那线程池设计需要考虑的点:

    • 线程池状态:

      • 有哪些状态?如何维护状态?
    • 线程

      • 线程怎么封装?线程放在哪个池子里?
      • 线程怎么取得任务?
      • 线程有哪些状态?
      • 线程的数量怎么限制?动态变化?自动伸缩?
      • 线程怎么消亡?如何重复利用?
    • 任务

      • 任务少可以直接处理,多的时候,放在哪里?
      • 任务队列满了,怎么办?
      • 用什么队列?

    如果从任务的阶段来看,分为以下几个阶段:

    • 如何存任务?
    • 如何取任务?
    • 如何执行任务?
    • 如何拒绝任务?

    线程池状态

    状态有哪些?如何维护状态?

    状态可以设置为以下几种:

    • RUNNING:运行状态,可以接受任务,也可以处理任务
    • SHUTDOWN:不可以接受任务,但是可以处理任务
    • STOP:不可以接受任务,也不可以处理任务,中断当前任务
    • TIDYING:所有线程停止
    • TERMINATED:线程池的最后状态

    各种状态之间是不一样的,他们的状态之间变化如下:

    而维护状态的话,可以用一个变量单独存储,并且需要保证修改时的原子性,在底层操作系统中,对int的修改是原子的,而在32位的操作系统里面,对double,long这种64位数值的操作不是原子的。除此之外,实际上JDK里面实现的状态和线程池的线程数是同一个变量,高3位表示线程池的状态,而低29位则表示线程的数量。

    这样设计的好处是节省空间,并且同时更新的时候有优势。

    线程相关

    线程怎么封装?线程放在哪个池子里?

    线程,即是实现了Runnable接口,执行的时候,调用的是start()方法,但是start()方法内部编译后调用的是 run() 方法,这个方法只能调用一次,调用多次会报错。因此线程池里面的线程跑起来之后,不可能终止再启动,只能一直运行着。既然不可以停止,那么执行完任务之后,没有任务过来,只能是轮询取出任务的过程

    线程可以运行任务,因此封装线程的时候,假设封装成为 Worker, Worker里面必定是包含一个 Thread,表示当前线程,除了当前线程之外,封装的线程类还应该持有任务,初始化可能直接给予任务,当前的任务是null的时候才需要去获取任务。

    可以考虑使用 HashSet 来存储线程,也就是充当线程池的角色,当然,HashSet 会有线程安全的问题需要考虑,那么我们可以考虑使用一个可重入锁比如 ReentrantLock,凡是增删线程池的线程,都需要锁住。

        private final ReentrantLock mainLock = new ReentrantLock();
    

    线程怎么取得任务?

    (1)初始化线程的时候可以直接指定任务,譬如Runnable firstTask,将任务封装到 worker 中,然后获取 worker 里面的 threadthread.run()的时候,其实就是 跑的是 worker 本身的 run() 方法,因为 worker 本身就是实现了 Runnable 接口,里面的线程其实就是其本身。因此也可以实现对 ThreadFactory 线程工厂的定制化。

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            final Thread thread;
            Runnable firstTask;
    
            ...
    
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                // 从线程池创建线程,传入的是其本身
                this.thread = getThreadFactory().newThread(this);
            }
        }
    

    (2)运行完任务的线程,应该继续取任务,取任务肯定需要从任务队列里面取,要是任务队列里面没有任务,由于是阻塞队列,那么可以等待,如果等待若干时间后,仍没有任务,倘若该线程池的线程数已经超过核心线程数,并且允许线程消亡的话,应该将该线程从线程池中移除,并结束掉该线程。

    取任务和执行任务,对于线程池里面的线程而言,就是一个周而复始的工作,除非它会消亡。

    线程有哪些状态?

    现在我们所说的是Java中的线程Thread,一个线程在一个给定的时间点,只能处于一种状态,这些状态都是虚拟机的状态,不能反映任何操作系统的线程状态,一共有六种/七种状态:

    • NEW:创建了线程对象,但是还没有调用Start()方法,还没有启动的线程处于这种状态。

    • Running:运行状态,其实包含了两种状态,但是Java线程将就绪和运行中统称为可运行

      • Runnable:就绪状态:创建对象后,调用了start()方法,该状态的线程还位于可运行线程池中,等待调度,获取CPU的使用权
        • 只是有资格执行,不一定会执行
        • start()之后进入就绪状态,sleep()结束或者join()结束,线程获得对象锁等都会进入该状态。
        • CPU时间片结束或者主动调用yield()方法,也会进入该状态
      • Running :获取到CPU的使用权(获得CPU时间片),变成运行中
    • BLOCKED :阻塞,线程阻塞于锁,等待监视器锁,一般是Synchronize关键字修饰的方法或者代码块

    • WAITING :进入该状态,需要等待其他线程通知(notify)或者中断,一个线程无限期地等待另一个线程。

    • TIMED_WAITING :超时等待,在指定时间后自动唤醒,返回,不会一直等待

    • TERMINATED :线程执行完毕,已经退出。如果已终止再调用start(),将会抛出java.lang.IllegalThreadStateException异常。

    image-20210509224848865

    线程的数量怎么限制?动态变化?自动伸缩?

    线程池本身,就是为了限制和充分使用线程资的,因此有了两个概念:核心线程数,最大线程数。

    要想让线程数根据任务数量动态变化,那么我们可以考虑以下设计(假设不断有任务):

    • 来一个任务创建一个线程处理,直到线程数达到核心线程数。
    • 达到核心线程数之后且没有空闲线程,来了任务直接放到任务队列。
    • 任务队列如果是无界的,会被撑爆。
    • 任务队列如果是有界的,任务队列满了之后,还有任务过来,会继续创建线程处理,此时线程数大于核心线程数,直到线程数等于最大线程数。
    • 达到最大线程数之后,还有任务不断过来,会触发拒绝策略,根据不同策略进行处理。
    • 如果任务不断处理完成,任务队列空了,线程空闲没任务,会在一定时间内,销毁,让线程数保持在核心线程数即可。

    由上面可以看出,主要控制伸缩的参数是核心线程数最大线程数,任务队列,拒绝策略

    线程怎么消亡?如何重复利用?

    线程不能被重新调用多次start(),因此只能调用一次,也就是线程不可能停下来,再启动。那么就说明线程复用只是在不断的循环罢了。

    消亡只是结束了它的run()方法,当线程池数量需要自动缩容的,就会让一部分空闲的线程结束。

    而重复利用,其实是执行完任务之后,再去去任务队列取任务,取不到任务会等待,任务队列是一个阻塞队列,这是一个不断循环的过程。

    任务相关

    任务少可以直接处理,多的时候,放在哪里?

    任务少的时候,来了直接创建,赋予线程初始化任务,就可开始执行,任务多的时候,把它放进队列里面,先进先出。

    任务队列满了,怎么办?

    任务队列满了,会继续增加线程,直到达到最大的线程数。

    用什么队列?

    一般的队列,只是一个有限长度的缓冲区,要是满了,就不能保存当前的任务,阻塞队列可以通过阻塞,保留出当前需要入队的任务,只是会阻塞等待。同样的,阻塞队列也可以保证任务队列没有任务的时候,阻塞当前获取任务的线程,让它进入wait状态,释放cpu的资源。因此在线程池的场景下,阻塞队列其实是比较有必要的。

    【作者简介】
    秦怀,公众号【秦怀杂货店】作者,技术之路不在一时,山高水长,纵使缓慢,驰而不息。这个世界希望一切都很快,更快,但是我希望自己能走好每一步,写好每一篇文章,期待和你们一起交流。如果有帮助,顺手点个赞,对我,是莫大的鼓励和认可。

    展开全文
  • 线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。 一个线程池包括以下四个基本组成部分: ...

    线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。

    一个线程池包括以下四个基本组成部分:
    线程管理器 (ThreadPool): 用于创建并管理线程池,包括创建线程,销毁线程池,添加新任务;
    工作线程 (PoolWorker): 线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
    任务接口 (Task): 每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
    任务队列 (TaskQueue): 用于存放没有处理的任务。提供一种缓冲机制;

    所包含的方法

    //创建线程池
    private ThreadPool()
    //获得一个默认线程个数的线程池
    public static ThreadPool getThreadPool()
    //执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器决定
    public void execute(Runnable task)
    //批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器决定
    public void execute(Runnable[] task)
    //销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
    public void destroy()
    //返回工作线程的个数
    public int getWorkThreadNumber()
    //返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
    public int getFinishedTasknumber()
    //在保证线程池中所有线程正在执行,并且要执行线程的个数大于某一值时。增加线程池中线程的个数
    public void addThread()
    //在保证线程池中有很大一部分线程处于空闲状态,并且空闲状态的线程在小于某一值时,减少线程池中线程的个数
    public void reduceThread()

    展开全文
  • 线程池(Java面试题)

    2021-03-02 11:16:55
    创建了太多线程,内存溢出2、如果一个任务要添加到线程池,处理流程是什么样的?先判断任务阻塞队列是否满了,如果满了就看是实现了哪种拒绝策略,如果没满,则添加到任务阻塞队列中,线程集合去判断是否任务数量...

    1、CachedThreadPool为什么会发生OOM异常?

    创建了太多线程,内存溢出

    2、如果一个任务要添加到线程池,处理流程是什么样的?

    先判断任务阻塞队列是否满了,如果满了就看是实现了哪种拒绝策略,如果没满,则添加到任务阻塞队列中,线程集

    合去判断是否任务数量大于核心线程数,大于就创建队列,最大不超过最大线程数,小于且线程超过空闲时间就减少

    线程数量到核心线程数。

    3、说说线程池的拒绝策略

    第一种:直接抛弃新任务

    第二种:直接抛弃新任务,抛出异常

    第三种:抛弃最老的任务,添加新任务

    第四种:直接使用当前线程执行任务

    4、线程池有些什么核心属性?

    核心线程数、最大线程数、线程空闲时间、阻塞队列、拒绝策略等

    5、四种JDK提供的线程池是什么样的?

    第一种:单个线程的线程池(newSingleThreadExecutor),单例模式,线程出现异常了重新创建线程

    第二种:固定线程数的线程池(newFixedThreadPool)

    第三种:可定时的线程池(newScheduledThreadPool),可以执行延迟任务,可以执行有返回值的任务

    6、JDK提供的这几个线程池有什么问题?

    固定线程数的线程池和单个线程的线程池会可能会堆积大量任务,造成OOM

    缓存的线程池和可定时的线程池可能的创建大量线程,造成OOM

    第四种:缓存的线程池(newCachedThreadPool)

    7、线程池大小如何设计?

    这个要看任务类型是CPU密集型的还是IO密集型的了,如果是CPU密集型就设置为CPU核数+1个线程,如果

    是IO密集型,因为IO操作不占用CPU,所以可以设置为两倍的CPU+1个线程数量。

    8、线程池底层是怎么实现的知道么?用的什么数据结构?

    9、如果让你写一个线程池,你会怎么写,有哪些模块?

    10、你线程池里怎么新建线程

    11、如何手写一个简单的线程池?

    (首先创建一个线程池类,里面有一些核心属性,比如核心线程数

    最大线程数、任务阻塞队列、线程集合、线程空闲时间)

    第一步:预热,通过构造函数创建核心线程并启动,还有就是创建阻塞队列 LinkedBolockingQueue

    第二步:添加任务,创建一个execute方法,使用offer把任务添加到阻塞队列中

    第三部:消耗任务,创建一个工作线程内部类,通过while循环不断调用poll方法

    去获取任务,调用runnable的run方法执行任务。

    12、为什么要使用线程池?

    如果频繁的创建和销毁线程,对CPU消耗是很大的,需要考虑使用线程复用

    13、使用线程池有什么优点?

    复用线程,减少创建和销毁线程的CPU消耗还有就是统一管理线程

    展开全文
  • 前言和设计模式一样,打算花三个月的时间,结合《Java并发编程实战》一书,来总结下并发方面的知识。第一章从线程池的原理开始总结,...另外,每有一个任务分配给线程池,我们就从线程池中分配一个线程处理它。但...

    前言

    和设计模式一样,打算花三个月的时间,结合《Java并发编程实战》一书,来总结下并发方面的知识。第一章从线程池的原理开始总结,希望自己能坚持下来,加油!

    1. 如何实现一个线程池?

    线程池的概念这里不多说,在讲它的原理前,我们先自己想一下,如果我来写,那如何实现一个线程池?

    1.1 线程池的重要变量

    首先要定义一个存放所有线程的集合;

    另外,每有一个任务分配给线程池,我们就从线程池中分配一个线程处理它。但当线程池中的线程都在运行状态,没有空闲线程时,我们还需要一个队列来存储提交给线程池的任务。

    /**存放线程的集合*/

    private ArrayList threads;

    /**任务队列*/

    private ArrayBlockingQueue taskQueue;

    初始化一个线程池时,要指定这个线程池的大小;另外,我们还需要一个变量来保存已经运行的线程数目。

    /**线程池初始限定大小*/

    private int threadNum;

    /**已经工作的线程数目*/

    private int workThreadNum;

    1.2 线程池的核心方法

    线程池执行任务

    接下来就是线程池的核心方法,每当向线程池提交一个任务时。如果 已经运行的线程

    public void execute(Runnable runnable) {

    try {

    mainLock.lock();

    //线程池未满,每加入一个任务则开启一个线程

    if(workThreadNum < threadNum) {

    MyThead myThead = new MyThead(runnable);

    myThead.start();

    threads.add(myThead);

    workThreadNum++;

    }

    //线程池已满,放入任务队列,等待有空闲线程时执行

    else {

    //队列已满,无法添加时,拒绝任务

    if(!taskQueue.offer(runnable)) {

    rejectTask();

    }

    }

    } finally {

    mainLock.unlock();

    }

    }

    到这里,一个线程池已经实现的差不多了,我们还有最后一个难点要解决:从任务队列中取出任务,分配给线程池中“空闲”的线程完成。

    分配任务给线程的第一种思路

    很容易想到一种解决思路:额外开启一个线程,时刻监控线程池的线程空余情况,一旦有线程空余,则马上从任务队列取出任务,交付给空余线程完成。

    这种思路理解起来很容易,但仔细思考,实现起来很麻烦(1. 如何检测到线程池中的空闲线程 2. 如何将任务交付给一个.start()运行状态中的空闲线程)。而且使线程池的架构变的更复杂和不优雅。

    分配任务给线程的第二种思路

    现在我们来讲第二种解决思路:线程池中的所有线程一直都是运行状态的,线程的空闲只是代表此刻它没有在执行任务而已;我们可以让运行中的线程,一旦没有执行任务时,就自己从队列中取任务来执行。

    为了达到这种效果,我们要重写run方法,所以要写一个自定义Thread类,然后让线程池都放这个自定义线程类

    class MyThead extends Thread{

    private Runnable task;

    public MyThead(Runnable runnable) {

    this.task = runnable;

    }

    @Override

    public void run() {

    //该线程一直启动着,不断从任务队列取出任务执行

    while (true) {

    //如果初始化任务不为空,则执行初始化任务

    if(task != null) {

    task.run();

    task = null;

    }

    //否则去任务队列取任务并执行

    else {

    Runnable queueTask = taskQueue.poll();

    if(queueTask != null)

    queueTask.run();

    }

    }

    }

    }

    1.3 最后生成的简单线程池

    /**

    * 自定义简单线程池

    */

    public class MyThreadPool{

    /**存放线程的集合*/

    private ArrayList threads;

    /**任务队列*/

    private ArrayBlockingQueue taskQueue;

    /**线程池初始限定大小*/

    private int threadNum;

    /**已经工作的线程数目*/

    private int workThreadNum;

    private final ReentrantLock mainLock = new ReentrantLock();

    public MyThreadPool(int initPoolNum) {

    threadNum = initPoolNum;

    threads = new ArrayList<>(initPoolNum);

    //任务队列初始化为线程池线程数的四倍

    taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);

    threadNum = initPoolNum;

    workThreadNum = 0;

    }

    public void execute(Runnable runnable) {

    try {

    mainLock.lock();

    //线程池未满,每加入一个任务则开启一个线程

    if(workThreadNum < threadNum) {

    MyThead myThead = new MyThead(runnable);

    myThead.start();

    threads.add(myThead);

    workThreadNum++;

    }

    //线程池已满,放入任务队列,等待有空闲线程时执行

    else {

    //队列已满,无法添加时,拒绝任务

    if(!taskQueue.offer(runnable)) {

    rejectTask();

    }

    }

    } finally {

    mainLock.unlock();

    }

    }

    private void rejectTask() {

    System.out.println("任务队列已满,无法继续添加,请扩大您的初始化线程池!");

    }

    public static void main(String[] args) {

    MyThreadPool myThreadPool = new MyThreadPool(5);

    Runnable task = new Runnable() {

    @Override

    public void run() {

    System.out.println(Thread.currentThread().getName()+"执行中");

    }

    };

    for (int i = 0; i < 20; i++) {

    myThreadPool.execute(task);

    }

    }

    class MyThead extends Thread{

    private Runnable task;

    public MyThead(Runnable runnable) {

    this.task = runnable;

    }

    @Override

    public void run() {

    //该线程一直启动着,不断从任务队列取出任务执行

    while (true) {

    //如果初始化任务不为空,则执行初始化任务

    if(task != null) {

    task.run();

    task = null;

    }

    //否则去任务队列取任务并执行

    else {

    Runnable queueTask = taskQueue.poll();

    if(queueTask != null)

    queueTask.run();

    }

    }

    }

    }

    }

    现在我们来总结一下,这个自定义线程池的整个工作过程:

    初始化线程池,指定线程池的大小。

    向线程池中放入任务执行。

    如果线程池中创建的线程数目未到指定大小,则创建我们自定义的线程类放入线程池集合,并执行任务。执行完了后该线程会一直监听队列

    如果线程池中创建的线程数目已满,则将任务放入缓冲任务队列

    线程池中所有创建的线程,都会一直从缓存任务队列中取任务,取到任务马上执行

    2. Java中的线程池实现

    自己实现一个线程池后,接下来我们来结合源码,看下Java中线程池是怎么实现的。

    2.1 线程池框架

    9b7dfb407f72

    Executor:

    最顶部接口,提供了execute()方法将任务提交和任务执行分离。当你把一个Runnable任务提交给Executor后,如何执行任务要看它的实现类。

    ExecutorService:

    继承Executor,增加了对线程池中任务生命周期的管理,可强制取消正在执行的任务,拒绝再接受任务。提供了submit()方法来扩展Executor.execute(),使任务执行有返回值。

    AbstractExecutorService:

    ExecutorService接口的默认实现,线程池的大部分功能已在这个类中被编写。

    ThreadPoolExecutor:

    线程池最核心的一个类,继承了AbstractExecutorService,完整的实现了一个线程池。

    ScheduledExecutorService:

    继承ExecutorService接口,在其基础上增加了定时执行任务的功能

    ScheduledThreadPoolExecutor:

    拥有定时调度任务功能的线程池,实现了ScheduledExecutorService接口,以及继承了ThreadPoolExecutor类

    2.2 ThreadPoolExecutor源码分析

    通过类图框架,我们已经知道,Java线程池最关键的一个类就是ThreadPoolExecutor。

    那这个类是如何实现线程池的呢?其实大致原理跟第一节我们手动写的简单线程池差不多,下面我们结合源码来完整的走一遍。

    构造方法

    首先看一下它的构造方法:

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue) {

    //构造方法

    ……

    }

    corePoolSize:即线程池大小,线程池中默认线程数目。

    maximumPoolSize:线程池中最大线程数目。注意,当线程池运行的任务过多时,允许线程池额外创建线程。此额外线程在任务变少后会被线程池关闭。

    keepAliveTime:线程空闲时间超过该值时,会被关闭。默认只有当线程数目>corePoolSize时才会生效。也可配置线程数目

    unit:keepAliveTime的时间单位

    workQueue:任务缓存队列的实现

    重要变量

    线程池的重要变量主要两个:runState(线程池运行状态:运行还是关闭)和workerCount(线程池中的线程数目)

    在JDK1.8中,这两个变量被放入了一个线程安全的int类型变量中

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    这里可以好好讲一下,貌似网上查到的线程池源码资料也没有好好分析过为什么要把两个变量合并成一个变量。

    首先,我们来看看它是如何将两个变量存在一起的

    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static int runStateOf(int c) { return c & ~CAPACITY; }

    private static int workerCountOf(int c) { return c & CAPACITY; }

    这段代码声明很神奇,我还是第一次知道int类型的变量还可以进行构造器式的声明。ctlOf就是合并变量ctl的值。它通过“或运算”,将runState和workerCount合并成了一个数字。

    为什么“或运算“可以将两个数字合并成一个呢?实际上,ctlOf是一个32位的数字,前三位存的是runState, 后27位存的是workerCount。类似1110000……(27个0)与00000000……001进行或运算,两个值就存在了同一个int变量中。取出来也只要通过位运算获取前3位和后27位数字即可(也就是runStateOf(int c)和workerCountOf(int c))。源码里面也有部分提示:

    // runState is stored in the high-order bits(runState存在高位中)

    private static final int RUNNING = -1 << COUNT_BITS;

    好,了解了两个变量合并成一个的实现,我们再来想想,为什么要这么麻烦的进行变量合并?我的思考答案是:为了不加锁实现线程安全。

    我们可以声明两个线程安全的int类型来分别存储runState和workerCount。对它们单独加减时确实是线程安全的,但如果一段代码有两个变量的共同参与呢?举个简单例子:

    //当运行状态是运行中时,增加线程数

    if(runState == RUNNING) {

    workCount.incrementAndGet();

    }

    这段代码显然是线程不安全的,假设线程A判断成功后进入方法块,挂起;这时线程B修改了runState的状态为SHUTDOWN。当线程A重新恢复执行时,虽然runState不是运行状态,但仍会对workCount进行增加操作。

    通过将两个变量合并成了一个线程安全变量,即完美的解决了这个问题,再有两个变量一起的代码时,也不需要加锁就实现了线程安全

    核心方法

    最后看一下重头戏,ThreadPoolExecutor的核心方法execute()。这个方法其实就是实现了Executor接口中的execute方法,执行你丢入线程池的任务。

    public void execute(Runnable command) {

    if (command == null)

    throw new NullPointerException();

    /*

    * Proceed in 3 steps:

    *

    * 1. If fewer than corePoolSize threads are running, try to

    * start a new thread with the given command as its first

    * task. The call to addWorker atomically checks runState and

    * workerCount, and so prevents false alarms that would add

    * threads when it shouldn't, by returning false.

    *

    * 2. If a task can be successfully queued, then we still need

    * to double-check whether we should have added a thread

    * (because existing ones died since last checking) or that

    * the pool shut down since entry into this method. So we

    * recheck state and if necessary roll back the enqueuing if

    * stopped, or start a new thread if there are none.

    *

    * 3. If we cannot queue task, then we try to add a new

    * thread. If it fails, we know we are shut down or saturated

    * and so reject the task.

    */

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

    if (addWorker(command, true))

    return;

    c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

    int recheck = ctl.get();

    if (! isRunning(recheck) && remove(command))

    reject(command);

    else if (workerCountOf(recheck) == 0)

    addWorker(null, false);

    }

    else if (!addWorker(command, false))

    reject(command);

    }

    其实源码注释已经写的很清楚,当你将一个任务Runnable提交给线程池执行时,主要分三步:

    如果线程池中的线程数(workCount)< 线程池大小(corePoolSize),则创建一个线程,执行这个任务,并把这个线程放入线程池。添加任务时会对线程池状态进行检查,以防止线程池状态为关闭时还添加线程。

    如果线程池中的线程数(workCount)>= 线程池大小(corePoolSize),或者上一步添加任务最后失败,将任务放入缓存队列中。当任务成功加入缓存队列,仍需要对线程池状态进行二次检查,防止线程池状态改为关闭或线程池中已经没有可以运行的线程。

    如果上一步将任务放入缓存队列失败,试着去增加一个新的线程来执行它(超过线程池大小的额外线程)。如果添加新线程失败(可能是线程数已到达maximumPoolSize),则抛出异常拒绝执行该任务。

    接下来,我们跟着这三个步骤来看看源码。

    当线程池中的线程数(workCount)< 线程池大小(corePoolSize),会执行addWorker()方法来增加一个线程,执行任务并放入线程池。

    private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

    int c = ctl.get();

    int rs = runStateOf(c);

    // Check if queue empty only if necessary.

    if (rs >= SHUTDOWN &&

    ! (rs == SHUTDOWN &&

    firstTask == null &&

    ! workQueue.isEmpty()))

    return false;

    for (;;) {

    int wc = workerCountOf(c);

    if (wc >= CAPACITY ||

    wc >= (core ? corePoolSize : maximumPoolSize))

    return false;

    if (compareAndIncrementWorkerCount(c))

    break retry;

    c = ctl.get(); // Re-read ctl

    if (runStateOf(c) != rs)

    continue retry;

    // else CAS failed due to workerCount change; retry inner loop

    }

    }

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

    w = new Worker(firstTask);

    final Thread t = w.thread;

    if (t != null) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

    // Recheck while holding lock.

    // Back out on ThreadFactory failure or if

    // shut down before lock acquired.

    int rs = runStateOf(ctl.get());

    if (rs < SHUTDOWN ||

    (rs == SHUTDOWN && firstTask == null)) {

    if (t.isAlive()) // precheck that t is startable

    throw new IllegalThreadStateException();

    workers.add(w);

    int s = workers.size();

    if (s > largestPoolSize)

    largestPoolSize = s;

    workerAdded = true;

    }

    } finally {

    mainLock.unlock();

    }

    if (workerAdded) {

    t.start();

    workerStarted = true;

    }

    }

    } finally {

    if (! workerStarted)

    addWorkerFailed(w);

    }

    return workerStarted;

    }

    这个方法分为了两个部分。

    第一部分,也就是for循环里面的代码,主要是检查线程池运行状态(关闭或其他状态则添加失败),以及线程数目是否超过了指定值(超过最大线程数则添加失败)。

    为了保持线程安全,在检查的最后都会重新获取一遍runState和workCount,与for循环最开始取到的两个值进行对比,如果不一样,则证明别的线程修改了它们,重新循环进行校验。

    第二部分,for循环校验完毕后,证明此时是可以添加线程的,然后我们可以发现代码中创建了一个新对象:w = new Worker(firstTask)。这个Worker是什么呢?

    我们看下Workder的代码会发现,它其实就是前面我们写的自定义线程池里面的自定义线程类。

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    //构造方法

    Worker(Runnable firstTask) {

    setState(-1); // inhibit interrupts until runWorker

    this.firstTask = firstTask;

    this.thread = getThreadFactory().newThread(this);

    }

    ……

    }

    在Worker的构造方法中,它会保存传进来的任务firstTask,并且创建一个包裹自身的线程thread,然后暴露给addWorker()方法。

    w = new Worker(firstTask);

    final Thread t = w.thread;

    如上面这段addWorker()中的代码,其实等同于:

    w = new Worker(firstTask);

    final Thread t = new Thread(w);

    addWorker()中获取到线程t后,会将t放入线程池,以及启动它:

    //添加进线程池

    workers.add(w);

    ……

    //添加成功后,启动线程

    if (workerAdded) {

    t.start();

    workerStarted = true;

    }

    启动线程t其实最后执行的是Worker中的run()方法,而run()方法又执行的runWorker(w)方法:

    public void run() {

    runWorker(this);

    }

    我们观察runWorker(w)方法可以发现,和第一节提到的自定义线程一样,实际这个方法就是执行提交给它的第一个任务后,继续持续的去缓存队列中取任务执行。

    final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

    while (task != null || (task = getTask()) != null) {

    w.lock();

    // If pool is stopping, ensure thread is interrupted;

    // if not, ensure thread is not interrupted. This

    // requires a recheck in second case to deal with

    // shutdownNow race while clearing interrupt

    if ((runStateAtLeast(ctl.get(), STOP) ||

    (Thread.interrupted() &&

    runStateAtLeast(ctl.get(), STOP))) &&

    !wt.isInterrupted())

    wt.interrupt();

    try {

    beforeExecute(wt, task);

    Throwable thrown = null;

    try {

    task.run();

    } catch (RuntimeException x) {

    thrown = x; throw x;

    } catch (Error x) {

    thrown = x; throw x;

    } catch (Throwable x) {

    thrown = x; throw new Error(x);

    } finally {

    afterExecute(task, thrown);

    }

    } finally {

    task = null;

    w.completedTasks++;

    w.unlock();

    }

    }

    completedAbruptly = false;

    } finally {

    processWorkerExit(w, completedAbruptly);

    }

    }

    参照第一节,相信你很容易理解runWorker()方法以及Worker类的本质。

    到这里,线程池源码的分析也就差不多了,有了上面的基础,execute()的剩下两步也很简单,这里不多讲了。

    3. 线程池的使用

    通过第二节,我们已经知道,通过ThreadPoolExecutor类,可以构造出各种需求的线程池。

    在实际应用中,我们不采取每次自己根据需求分析new一个ThreadPoolExecutor的做法,有一个类已经帮我们做好了一切:Executors

    想要创建什么样性质的线程池,直接调用Executors中的静态方法就行了

    Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

    Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池

    Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池

    这里对Executors不多讲解了,想对Executors有进一步了解可看:

    Java并发编程(19):并发新特性—Executor框架与线程池(含代码)

    本文部分内容参考以下文章:

    深入理解Java之线程池

    展开全文
  • 线程池核心设计与实现、线程池使用了什么设计模式、要你设计的话,如何实现一个线程池
  • 前言和设计模式一样,打算花三个月的时间,结合《Java并发编程实战》一书,来总结下并发方面的知识。第一章从线程池的原理开始总结,...另外,每有一个任务分配给线程池,我们就从线程池中分配一个线程处理它。但...
  • 一个设计方案一定是从最小的架子慢慢搭建起来的,那么想一下,最基础的线程池应该是什么样子的? 【1】需要有线程源。比较线程池说白了就是一个存放线程的容器,自身不生产线程。(当然了,如果暂时不接入线程工厂的...
  • 问题(1)自己动手写一个线程池需要考虑哪些因素?(2)自己动手写的线程池如何测试?简介线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池。属性分析...
  • 一、使用线程池在流量突发期间能够平滑地服务降级很多场景下应用程序必须能够处理一系列传入请求,简单的处理方式是通过一个线程顺序的处理这些请求,如下图:单线程策略的优势和劣势都非常明显:优势:...
  • 我个人觉得如果要设计一个线程池的话得考虑池内工作线程的管理、任务编排执行、线程池超负荷处理方案、监控等方面。 要将初始化线程数、核心线程数、最大线程池都暴露出来可配置,包括超过核心线程数的线程空闲消亡...
  • 线程池就是这样一个概念,当需要创建线程的时候直接从线程池中获取,当关闭线程的时候直接归还线程给线程池。ThreadPoolExecutor就是JDK提供的这样一个类。它继承AbstructExecutorService类,...
  • 目前业界线程池设计,普遍采用的都是生产者 - 消费者模式。 import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent....
  • 前言我相信大家都看过很多的关于线程池的文章...本文大重点是源码解析,不过线程池设计思想以及作者实现过程中的一些巧妙用法是我想传达给读者的。本文还是会行行关键代码进行分析,目的是为了让那些自己看源码...
  • 数据结构设计任务设计[cpp] view plaincopytypedef struct tp_work_desc_s TpWorkDesc;typedef void (*process_job)(TpWorkDesc*job);struct tp_work_desc_s {void *ret; //call in, that is argumentsvoid *arg; //...
  • 线程池设计

    2021-08-19 14:43:50
    IO模式: 阻塞 非阻塞忙轮询 响应式-- 多路IO转接 多路IO转接: select、poll、epoll //错了?...在一个应用程序中,会多次使用线程,因此免不了就需要多次线程的创建及销毁。 但是多次线程创建和销毁会
  • 点击上方“中间件兴趣圈”,选择“设为星标”做积极的人,越努力越幸运!见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为Rocke...
  • public static void main(String[] args) { //定义两个集合,一个是存放客户端请求的,利用Vector, //一个是存储线程的,就是线程池中的线程数目 //Vector是线程安全的,它实现了Collection和List //Vector 类可以...
  • 手撸一个线程池

    2021-03-05 16:45:10
    常规开头 多线程编程是在开发过程中非常基础且非常重要的一个环节...这次准备手撸一个简单版的线程池加强一下对执行流程的理解。 简单的过程 废话不多说,直接上代码 /** * @Author: ZRH * @Date: 2021/3/5 15:13
  • 线程是Java的一大特性,它可以是给定的指令序列、给定的方法中定义的变量或者一些共享数据(类一级的...一个线程不能访问另外一个线程的堆栈变量,而且这个线程必须处于如下状态之一:1.排队状态(Ready),在用户创建...
  • 手写一个 JAVA 线程池

    2021-02-26 18:39:31
    常见的比如常量池、连接池、线程池,今天我们手撸一个线程池。抛开语言特性,线程池无非是维护一堆线程阻塞等待任务的到来,并由主线程对任务线程的数量进行动态控制的组件。做到线程资源的复用及统一管理,同时避免...
  • 也就是该线程池有两问题,其是当线程池中线程数量未达到设计上限的时候,每次新增任务都会创建新的线程池,不会利用旧的,即使旧的线程已经空闲。其二是线程池中线程数量只能增加不能减少,即使线程已经空闲也...
  • 线程池代码importjava.util.List;importjava.util.Vector;publicclassThreadPool{privatestaticThreadPoolinstance_=null;//定义优先级别常数,空闲的线程按照优先级不同分别存放在三vector中...
  • 你好呀,我是乔哥。前几天,有个朋友在微信上找我。他问:乔哥,在吗?我说:发生肾么事了?他啪的一下就提了一个问题啊,很快。我大意了,随意瞅了一眼,这题不是很简单吗?结果没想到里面还隐藏着一篇...
  • Java线程池 - ()自定义线程池 什么是线程池线程池就是种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务 为什么要使用线程池? 可以根据系统的需求和硬件环境灵活的...
  • 线程是一个重资源,JVM中的线程与操作系统的线程是一对一的关系,所以在JVM 中每创建一个线程就需要调用操作系统提供的 API 创建线程,赋予资源,并且销毁线程同样也需要系统调用。 而系统调用就意味着上下文切换等...
  • 写在前面的话写下这篇文章只为了回顾之前在实际工作中犯的一个极其二逼的错误,用我的经历来提示后来者,诸位程序大神,大牛,小牛们看到此文笑笑即可,轻拍轻拍。。。1 背景有这么一个需求,我们的系统(后面简称:A...
  • c++实现线程池

    2021-05-22 16:02:28
    线程池一个典型的生产者-消费者模型,用户将要执行的任务添加到任务队列,并通知线程池线程池则唤醒其中的一个线程来处理任务。一个简单的c++线程池实现如下所示: #include <iostream> #include <...
  • 本文大重点是源码解析,同时会有少量篇幅介绍线程池设计思想以及作者 Doug Lea 实现过程中的一些巧妙用法。本文还是会行行关键代码进行分析,目的是为了让那些自己看源码不是很理解的同学可以得到参考。线程池是...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 150,431
精华内容 60,172
关键字:

如何设计一个线程池