精华内容
下载资源
问答
  • 线程池实现原理
    万次阅读 多人点赞
    2018-03-16 21:57:32

    原理概述

    在这里插入图片描述

    其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。

    线程池的几个主要参数的作用

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    1. corePoolSize: 规定线程池有几个线程(worker)在运行。
    2. maximumPoolSize: 当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。
    3. keepAliveTime: 超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。
    4. unit: 生存时间对于的单位
    5. workQueue: 存放任务的队列
    6. threadFactory: 创建线程的工厂
    7. handler: 当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。

    任务提交后的流程分析

    用户通过submit提交一个任务。线程池会执行如下流程:

    1. 判断当前运行的worker数量是否超过corePoolSize,如果不超过corePoolSize。就创建一个worker直接执行该任务。—— 线程池最开始是没有worker在运行的
    2. 如果正在运行的worker数量超过或者等于corePoolSize,那么就将该任务加入到workQueue队列中去。
    3. 如果workQueue队列满了,也就是offer方法返回false的话,就检查当前运行的worker数量是否小于maximumPoolSize,如果小于就创建一个worker直接执行该任务。
    4. 如果当前运行的worker数量是否大于等于maximumPoolSize,那么就执行RejectedExecutionHandler来拒绝这个任务的提交。

    源码解析

    我们先来看一下ThreadPoolExecutor中的几个关键属性。

    //这个属性是用来存放 当前运行的worker数量以及线程池状态的
    //int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //存放任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    //worker的集合,用set来存放
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //历史达到的worker数最大值
    private int largestPoolSize;
    //当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
    private volatile RejectedExecutionHandler handler;
    //超出coreSize的worker的生存时间
    private volatile long keepAliveTime;
    //常驻worker的数量
    private volatile int corePoolSize;
    //最大worker的数量,一般当workQueue满了才会用到这个参数
    private volatile int maximumPoolSize;
    

    1. 提交任务相关源码

    下面是execute方法的源码

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //workerCountOf(c)会获取当前正在运行的worker数量
            if (workerCountOf(c) < corePoolSize) {
                //如果workerCount小于corePoolSize,就创建一个worker然后直接执行该任务
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //isRunning(c)是判断线程池是否在运行中,如果线程池被关闭了就不会再接受任务
            //后面将任务加入到队列中
            if (isRunning(c) && workQueue.offer(command)) {
                //如果添加到队列成功了,会再检查一次线程池的状态
                int recheck = ctl.get();
                //如果线程池关闭了,就将刚才添加的任务从队列中移除
                if (! isRunning(recheck) && remove(command))
                    //执行拒绝策略
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果加入队列失败,就尝试直接创建worker来执行任务
            else if (!addWorker(command, false))
                //如果创建worker失败,就执行拒绝策略
                reject(command);
    }
    

    添加worker的方法addWorker源码

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            //使用自旋+cas失败重试来保证线程竞争问题
            for (;;) {
                //先获取线程池的状态
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 如果线程池是关闭的,或者workQueue队列非空,就直接返回false,不做任何处理
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    //根据入参core 来判断可以创建的worker数量是否达到上限,如果达到上限了就拒绝创建worker
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //没有的话就尝试修改ctl添加workerCount的值。这里用了cas操作,如果失败了下一个循环会继续重试,直到设置成功
                    if (compareAndIncrementWorkerCount(c))
                        //如果设置成功了就跳出外层的那个for循环
                        break retry;
                    //重读一次ctl,判断如果线程池的状态改变了,会再重新循环一次
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                //创建一个worker,将提交上来的任务直接交给worker
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    //加锁,防止竞争
                    mainLock.lock();
                    try {
                        int c = ctl.get();
                        int rs = runStateOf(c);
                        //还是判断线程池的状态
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            //如果worker的线程已经启动了,会抛出异常
                            if (t.isAlive()) 
                                  throw new IllegalThreadStateException();
                            //添加新建的worker到线程池中
                            workers.add(w);
                            int s = workers.size();
                            //更新历史worker数量的最大值
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //设置新增标志位
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    //如果worker是新增的,就启动该线程
                    if (workerAdded) {
                        t.start();
                         //成功启动了线程,设置对应的标志位
                        workerStarted = true;
                    }
                }
            } finally {
                //如果启动失败了,会触发执行相应的方法
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
    }
    

    2. Worker的结构

    Worker是ThreadPoolExecutor内部定义的一个内部类。我们先看一下Worker的继承关系

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    

    它实现了Runnable接口,所以可以拿来当线程用。同时它还继承了AbstractQueuedSynchronizer同步器类,主要用来实现一个不可重入的锁。

    一些属性还有构造方法:

    //运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
    final Thread thread;
    //当一个worker刚创建的时候,就先尝试执行这个任务
    Runnable firstTask;
    //记录完成任务的数量
    volatile long completedTasks;
    Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                //创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
                this.thread = getThreadFactory().newThread(this);
    }
    

    worker的run方法

    public void run() {
                //这里调用了ThreadPoolExecutor的runWorker方法
                runWorker(this);
    }
    

    ThreadPoolExecutor的runWorker方法

    final void runWorker(Worker w) {
            //获取当前线程
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            //执行unlock方法,允许其他线程来中断自己
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //如果前面的firstTask有值,就直接执行这个任务
                //如果没有具体的任务,就执行getTask()方法从队列中获取任务
                //这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环
                while (task != null || (task = getTask()) != null) {
                    //执行任务前先锁住,这里主要的作用就是给shutdown方法判断worker是否在执行中的
                    //shutdown方法里面会尝试给这个线程加锁,如果这个线程在执行,就不会中断它
                    w.lock();
                   //判断线程池状态,如果线程池被强制关闭了,就马上退出
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //执行任务前调用。预留的方法,可扩展
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //真正的执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                           //执行任务后调用。预留的方法,可扩展
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        //记录完成的任务数量
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
    }
    

    下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法我们可以看出先吃池是怎么让超过corePoolSize的那部分worker销毁的。

    private Runnable getTask() {
            boolean timedOut = false; 
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 如果线程池已经关闭了,就直接返回null,
                //如果这里返回null,调用的那个worker就会跳出while循环,然后执行完销毁线程
                //SHUTDOWN状态表示执行了shutdown()方法
                //STOP表示执行了shutdownNow()方法
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
                //获取当前正在运行中的worker数量
                int wc = workerCountOf(c);
    
                // 如果设置了核心worker也会超时或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了
                //其实就是从队列获取任务的时候要不要设置超时间时间,如果超过这个时间队列还没有任务进来,就会返回null
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
                //如果上一次循环从队列获取到的未null,这时候timedOut就会为true了
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    //通过cas来设置WorkerCount,如果多个线程竞争,只有一个可以设置成功
                    //最后如果没设置成功,就进入下一次循环,说不定下一次worker的数量就没有超过corePoolSize了,也就不用销毁worker了
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    //如果要设置超时时间,就设置一下咯
                    //过了这个keepAliveTime时间还没有任务进队列就会返回null,那worker就会销毁
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    //如果r为null,就设置timedOut为true
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
    }
    

    3. 添加Callable任务的实现源码

    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
    }
    

    要添加一个有返回值的任务的实现也很简单。其实就是对任务做了一层封装,将其封装成Future,然后提交给线程池执行,最后返回这个future。
    这里的 newTaskFor(task) 方法会将其封装成一个FutureTask类。
    外部的线程拿到这个future,执行get()方法的时候,如果任务本身没有执行完,执行线程就会被阻塞,直到任务执行完。
    下面是FutureTask的get方法

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            //判断状态,如果任务还没执行完,就进入休眠,等待唤醒
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            //返回值
            return report(s);
    }
    
    

    FutureTask中通过一个state状态来判断任务是否完成。当run方法执行完后,会将state状态置为完成,同时唤醒所有正在等待的线程。我们可以看一下FutureTask的run方法

    public void run() {
            //判断线程的状态
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        //执行call方法
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        //这个方法里面会设置返回内容,并且唤醒所以等待中的线程
                        set(result);
                }
            } finally {
                runner = null;
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
    }
    

    4. shutdown和shutdownNow方法的实现

    shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完

    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //检查是否可以关闭线程
                checkShutdownAccess();
                //设置线程池状态
                advanceRunState(SHUTDOWN);
                //尝试中断worker
                interruptIdleWorkers();
                 //预留方法,留给子类实现
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
    }
    
    private void interruptIdleWorkers() {
            interruptIdleWorkers(false);
    }
    
    private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //遍历所有的worker
                for (Worker w : workers) {
                    Thread t = w.thread;
                    //先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
                    //注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
                    //它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
    }
    

    shutdownNow做的比较绝,它先将线程池状态设置为STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的worker,然后清空任务队列。

    public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                //检测权限
                advanceRunState(STOP);
                //中断所有的worker
                interruptWorkers();
                //清空任务队列
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
    }
    
    private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //遍历所有worker,然后调用中断方法
                for (Worker w : workers)
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
    }
    

    总结

    java线程池的实现原理还是挺简单的。但是有一些细节还是需要去看源码才能得出答案。本文也没办法把所有的源码都讲解一遍,只列了比较重要的一些源码。有兴趣的同学可以自己打开源码好好看一下,肯定会对实现原理了解的更加深刻。

    最后,如果本文有哪里说的不对或者遗漏的地方,也烦请指出,感激不尽。

    更多相关内容
  • 线程池实现原理

    2018-11-16 14:20:42
    瞎写的线程池原理,没啥用,只是留着自己以后想看看,在技术博客中都有.不要下载,乱七八糟的,怕你们吐槽.
  • Java线程池实现原理

    2022-03-17 13:36:52
    参考:Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com) 一、线程池是什么 线程池是一种对线程进行池化管理的思想和工具,广泛应用于多线程服务器中 线程的创建和销毁都会带来很多额外开销...

    参考:Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)

    一、线程池是什么

    线程池是一种对线程进行池化管理的思想和工具,广泛应用于多线程服务器中

    线程的创建和销毁都会带来很多额外开销降低了服务器性能,线程池可以维护多个线程来执行任务,并且可以通过任务队列来缓存任务提高了并发能力。

    线程池优点

    • 降低资源消耗:池化技术可以减少线程的创建和销毁带来的性能损耗
    • 提高响应速度:任务到达后可以无需等待线程创建直接执行
    • 提高线程的可管理性:使用线程池可以对线程、任务进行监控和管理
    • 提高更多更强大的功能:线程池具备可扩展性

    二、线程池核心设计和实现

    2.1 总体设计

    Java中的线程池核心实现类是ThreadPollExecutor,下图是ThreadPollExecutor的UML类图

    图1 ThreadPoolExecutor UML类图

    顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注线程的创建、销毁、调度,用户只需要提供实现了Runnable接口的任务对象,由Executor框架完成线程的创建、任务的执行等。

    ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。

    AbstractExecutorService则是上层的抽象类,将执行任务的流程串联起来,保证下层的实现只需要关注单个的方法实现。

    ThreadPollExecutor底层实现类

    线程池的运行状态

    线程池的运行状态由五种,分别为:RUNNING(运行)、SHUTDOWN(关闭,不接受新任务但会执行任务队列中的任务)、STOP(不接收新任务也不执行队列中的任务)、TIDYING(所有任务终止,有效线程数为0)、TERMINATED(终止状态)

    2.2 执行机制

    2.2.1 任务调度

    图4 任务调度流程

    这里涉及到几个线程池的重要参数:核心线程数最大线程数保持时间任务队列拒绝策略

    • 核心线程数:核心线程数是指线程池长期保留的线程数量

    • 最大线程数:线程池最大可以创建的线程个数,最大线程数大于等于核心线程数,线程池会在任务执行结束后销毁掉超出核心线程个数的线程。这部分线程的创建是作为任务队列无法容纳任务时的临时缓冲,换句话说这部分线程只会在核心线程都在执行任务并且任务队列无法容纳多余任务时才会创建出来。

    • 保留时间:就是指超出核心线程数的线程执行完任务后可以存活的时间

    • 任务队列:线程池的核心就是将线程和任务分离,通过线程不断从任务队列中获取任务来实现。任务队列就是用来存放任务的队列。任务队列也叫阻塞队列,因为当线程在获取任务时如果没有任务该线程会被阻塞直到获取到任务,当向队列中提交任务时如果队列已满提交任务的线程也会被阻塞直到可以提交。

      常用的阻塞队列有如下几种

      • ArrayBlockingQueue:数组实现的有界队列,FIFO的原则进行排序,支持公平锁和非公平锁
      • LinkedBlockingQueue:链表实现的有界队列,FIFO的原则进行排序,此队列的默认最大长度为Integer.MAX_VALUE使用时注意
      • PriorityBlockingQueue:支持任务优先级进行排列
      • DelayQueue:延迟无界队列,任务只能在指定时间后才能被线程获取
      • SynchronousQueue:不存储元素的无界队列,每个任务都必须有线程执行,否者不能继续添加元素。
    • 拒绝策略:任务长时间无法被执行时将会执行拒绝策略

      默认的拒绝策略有如下几种

      • ThreadPoolExcutor.AbortPolicy:丢弃任务并抛出异常,也是默认的拒绝策略
      • ThreadPoolExcutor.DiscardPolicy:丢弃任务单不抛出异常,因为不抛出异常谨慎使用该策略
      • ThreadPoolExcutor.DiscardOldestPolicy:丢弃任务队列最前面的任务,然后重新提交被拒绝的策略
      • ThreadPoolExcutor.CallerRunsPolicy:让调用线程来执行该任务

    2.2.2 Worker线程回收

    线程池中线程的销毁依赖JVM的自动回收,线程池会维护线程的引用,当线程需要被销毁时,线程池只需要断开其引用即可。

    三、线程池实践

    3.1 业务场景

    线程池主要使用在需要快速响应用户请求或者快速处理批量任务等业务场景

    3.2 问题

    1. 工作线程设置过少会导致部分任务得不到处理
    2. 任务队列过长可能会导致任务堆积

    3.3 解决思路

    1. 使用协程框架(Java中协程生态不是很好)
    2. 参数公式化(没有一种计算公司能适配各种业务场景)
    3. 参数动态化,将线程池的各种参数进行统一配置和统一管理监控(依赖于线程池框架中提供的set方法)
    展开全文
  • 这篇文章主要介绍了Django异步任务线程池实现原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 当数据库数据量很大时(百万级),许多批量数据修改请求的...
  • C++线程池实现原理

    2021-01-20 03:37:47
     所以写这篇文章主要是想以非常简单的方式讲讲实现原理, 希望初学者看完之后不是觉得「不明觉厉」,而是觉得「原来如此」。  面朝代码  首先先来一段超级简单(注释丰富)的代码展示多线程编程的经典写法。 ...
  • 线程池实现原理剖析

    2021-01-22 17:21:19
    线程池,ThreadPoolExecutor 的实现原理有一定理解后,我们可以把它用的更好,对它的参数有更加深刻的理解,甚至我们可以扩展,监控自己的线程池。 ThreadPoolExecutor实现原理 本文代码基于JDK1.8 线程池相关的类...

    概要

    线程池,大家都很熟悉了,我们在平时应用中也用的很多。对线程池,ThreadPoolExecutor 的实现原理有一定理解后,我们可以把它用的更好,对它的参数有更加深刻的理解,甚至我们可以扩展,监控自己的线程池。

    ThreadPoolExecutor实现原理

    本文代码基于JDK1.8

    线程池相关的类的关系

    我们先看看主要的类ThreadPoolExecutor的继承关系

    图片

    平时可能还有用到的 Executors 类,这是一个工具类,提供 newFixedThreadPool newCachedThreadPool newScheduledThreadPool 等静态方法方便我们创建线程池,最终还是调用 ThreadPoolExecutor 来创建的,一般规范不建议直接使用 Executors 来创建线程池。

    线程池创建的主流程

    线程池的状态

    先看看线程池的状态有哪些,对它有初步的理解

    图片

    线程池的状态和运行的线程数只用了一个 int,其中高3位表示状态,低29位表示线程数。状态表示的意思和状态之间的转换:RUNNING - 可以接受新的任务,及执行队列中的任务

    SHUTDOWN - 不接受新的任务,但会继续执行队列中的任务

    STOP - 不接受新的任务,既不会执行队列中的任务,还会中断执行中的任务

    TIDYING - 全部任务已经终止,且线程数为0

    TERMINATED - 线程池完全终止

    RUNNING -> SHUTDOWN - 执行了 shutdown()

    (RUNNING or SHUTDOWN) -> STOP - 执行了shutdownNow()

    SHUTDOWN -> TIDYING - 队列和线程为空

    STOP -> TIDYING - 线程为空

    TIDYING -> TERMINATED - terminated() 这个勾子方法执行完毕

    线程池的创建

    ThreadPoolExecutor 的构造函数

    图片

    线程池的创建只是初始化了一些参数,但理解好这些参数对我们使用线程池很有帮助。
    corePoolSize - 核心线程数,除非 allowCoreThreadTimeOut 为 true(默认false),否则即使没有任务,也会维持这么多线程。

    maximumPoolSize - 最大线程数,corePoolSize 满了的话,会把任务放到队列,如果队列满了的话(假设队列有界),就会继续创建线程直到 maximumPoolSize,如果超过 maximumPoolSize 则会执行 reject 策略。

    workQueue - 用来存放任务的队列,是一个 BlockingQueue,常用的有 LinkedBlockingQueue,ArrayBlockingQueue,SynchronousQueue。

    keepAliveTime - 空闲线程的存活时间,如果当前线程数 > corePoolSize,且某个线程空闲了这么多时间(没有获取到任务并运行),那么这个线程会被 remove 掉。

    unit - keepAliveTime 的单位,内部会统一转换成 nanos

    threadFactory - 用来创建线程的 ThreadFactory,主要用来给线程命名(方便查看日志),设置 daemon,优先级和 group 等,Executors 有 DefaultThreadFactory 这个默认实现。

    handler - reject 具体执行策略,reject 的条件上面已经说了,一般内置有以下几个,也可以自己实现

    CallerRunsPolicy - 不使用线程池的线程执行该任务,直接用当前执行任务(例如 main 线程)

    AbortPolicy - 直接抛异常

    DiscardPolicy - 无视不做处理,相当于抛弃掉

    DiscardOldestPolicy - 将队列头的任务取出来抛弃掉,然后运行当前任务

    线程池的执行

    一般我们使用 ExecutorService 的 submit 方法来使用线程池执行一个任务,这个方法调用到 AbstractExecutorService

    图片

    这里我们看到所有 task 无论是 Callable 还是 Runnable 的都会包装成一个 RunnableFuture 也就是 FutureTask,返回给我们。

    execute方法

    重点看 execute 方法,调用了 ThreadPoolExecutor 的 execute

    图片

    我们分三种情形来看,每个是一个 if 条件:第一,当 workCount < corePoolSize 时,直接创建 worker 线程;

    第二,如果上面创建失败(可能是线程池正在处于关闭状态,可能是 workCount > corePoolSize 了 - 并发场景),那么这时把任务放入 workQueue 队列;下面的判断是用来防止,线程池不在运行了,就把任务删掉;如果没有线程了就加一个;

    第三,来到这步说明上面放队列不成功(可能是队列是有界的,满了),那么就继续创建线程来满足,如果这都创建失败(可能是 > maximumPoolSize)就 reject 了;

    addWorker方法

    继续看看重点的 addWorker 方法,addWorker 分开两部分来看。

    图片

    这一步是判断是否可以增加 worker 的重点:

    第一,首先开始的判断有点奇怪,我也不是很明白,先认为它是如果状态是 SHUTDOWN 则不允许创建线程;

    第二,下面有个 core 参数,true 使用 corePoolSize,false 使用 maximumPoolSize,我们上面说的 execute 方法第一次就是传 true 的,第二次就传 false。所以这里就是对 size 做判断, 如果 >= size 则返回 false,否则 cas 一下,成功了就 break 执行下面的代码;

    第三,如果 cas 失败,说明有其他并发竞争,则 cintinue 循环上面的步骤判断。

    图片

    来到这一步说明可以创建 worker 了,这里用了一个全局 lock,来控制创建线程和关闭线程的不能同时做。可以看到步骤就是 new 一个 worker,add 到 workers 里,workers 是一个 HashSet。

    图片

    largestPoolSize 来用记录最大的线程数。如果 workerStarted == false(线程池在关闭或创建 worker 时异常),则会 addWorkerFailed 方法。

    图片

    主要就是 remove 掉 worker,扣减计数,这里还会调用 tryTerminate 。这个方法会在很多地方用到,它的作用就是防止线程池在终止状态这种情形,去终止线程。

    图片

    Worker是什么

    我们刚刚一直说 worker,那到底 Worker 究竟是什么?我们现在来看看

    图片

    我们可以看到 Worker 是一个 AQS 和 Runnable。为什么是一个 AQS ?我们可以结合注释和代码可以得到,worker 在跑任务的时候会 lock 住,在被中断时会 tryLock,利用上锁这一点来区分这个 worker 是否空闲。Worker 中重写 AQS 的方法。(感概:AQS 真是个简单易用,用于并发控制的好工具!)

    图片

    为什么是一个 Runnable ?我们看看 Worker 的构造函数,在创建 Thread 时把自己 this 传到 thread 的参数,说明 worker 的 thread 跑的是自己,这时我们就知道 worker 的入口了。

    Worker 的 run 方法

    图片

    runWorker方法

    重点的 runWorker 方法

    图片

    task 可能是传进来的 firstTask 或者 getTask() 获取到的,getTask 也是重点方法,等下讲到;运行 task 时会上锁,锁的作用我刚刚已经说了;如果线程池状态是 STOP 则中断线程;这里放了两个勾子 beforeExecute 和 afterExecute 方法来提供给子类做点事情,一般用于监控或统计线程池的执行情况;执行任务就直接 task.run() 了,还记得我说过这个 task 是一个 FutureTask,如果run 的时候抛出异常,FutureTask 会 catch 掉不会再 throw(如果大家对 FutureTask 不熟悉就先这样理解),所以这里不会进入 catch,也就是不会 throw x 了。如果 task 不像 FutureTask 一样自己处理掉异常,那就会 throw x 了,那么 worker 的线程就会跳出 while 循环,完成使命,结束自己;获取不到 task (task 为null)或者循环过程中异常,最后都会执行 processWorkerExit。

    图片

    processWorkerExit 的作用主要就是 remove 掉 worker,那么扣减 workCount 呢?好像没有看到。这里用了 completedAbruptly 这一变量来控制是否在 processWorkerExit 扣减 workCount,因为有可能是在 getTask 已经扣减了,那么在 processWorkerExit 就不用重复扣减。我们结合 runWorker 来看看,分两种情况:

    1、如果 firstTask 为 null,那么会走到 getTask 方法,如果 getTask 返回 null,那么说明已经是扣减了,这时退出循环,completedAbruptly = false,不用重复扣减。

    2、如果 firstTask 不为 null (1)执行 firstTask 正常结束,然后循环,走到 getTask,如果返回 task 为 null,那么 completedAbruptly = false,不用重复扣减。

    (2)执行 firstTask 执行异常,这时 completedAbruptly = true,需要扣减 这里我们又看到 tryTerminate 了;下面的判断主要是尝试去增加一个 worker,因为你 remove 掉了一个,如果条件允许,那就加回一个呗。

    getTask方法

    看看重点的 getTask 方法

    图片

    在 getTask 时如果发现时线程池在关闭状态,那么就需要停止获取任务了;

    如果 wc > maximumPoolSize,超过了最大 size 了,就去 cas 扣减 workCount 一下,成功就返回 null;

    如果 wc > corePoolSize(小于 maximumPoolSize),且 timedOut 的话,说明这个 worker 也有点“多余”,也去扣减 workCount。注意这里对 timedOut 的判断,通过 queue 的定时 poll 方法,时间是线程池的构造参数 keepAliveTime,如果过了这么久都还没获取 task,说明 queue 是空的,有空闲的线程,那就把这个 worker remove 掉吧;

    如果 wc < corePoolSize 的话,那就调用 queue 的 take 方法一直阻塞获取 task;

    还有 allowCoreThreadTimeOut 参数,它的意思是忽略对 corePoolSize 的判断。

    关闭线程池

    上面已经把线程的创建和执行基本说得7788了,我们看看关闭线程池是如何做的,其实在上面的很多方法中,都看到很多对如 SHUTDOWN 的判断了。主要有 shutdown 和 shutdownNow 这两个方法。

    图片

    图片

    这两个方法很相似,从名字来看 shutdown 显得更加的柔性,实际也是。shutdown – 不接受新的 task,在运行和已经在队列的 task 可以继续运行;把状态改为 SHUTDOWN;中断空闲 worker,这个在上面已经提到过了,用锁对是否空闲做判断。

    interruptIdleWorkers 打断空闲的线程

    图片

    这里还有个 onShutdown 勾子方法。

    shutdownNow – 不接受新的 task,中断已经在运行的线程,清空队列;把状态改为 STOP;强制中断所有在运行 worker 的线程;drainQueue,相当于把队列的 task 丢弃掉;

    总结

    线程池ThreadPoolExecutor实现的原理,就是用 Worker 线程不停得取出队列中的任务来执行,根据参数对任务队列和 Workers 做限制,回收,调整。

    参考资料

    http://www.jianshu.com/p/87bff5cc8d8c https://javadoop.com/post/java-thread-pool

    展开全文
  • C++ 线程池实现原理1

    2022-08-04 14:34:55
    背景多线程编程是C++开发者的一个基本功, 但是很多开发者都是直接使用公司给包装好的线程池库, 没有去了解具体实现,有些实现也都因为高度优化而写得讳莫如深,让初
  • 该文档为c++开发的线程池,可以作为参考
  • 随着计算机行业的飞速发展,摩尔...提示:本文开篇简述线程池概念和用途,接着结合线程池的源码,帮助读者领略线程池的设计思路,最后回归实践,通过案例讲述使用线程池遇到的问题,并给出了一种动态化线程池解决方案。

    强烈安利csdn markdown文档的模板,简直让我的笔记排版耐看许多呀~

    前言

    随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池ThreadPoolExecutor类,帮助开发人员管理线程并方便地执行并行任务。


    提示:本文开篇简述线程池概念和用途,接着结合线程池的源码,帮助读者领略线程池的设计思路,最后回归实践,通过案例讲述使用线程池遇到的问题,并给出了一种动态化线程池解决方案。

    一、写在前面

    1.1 线程池是什么?

    线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
    本文描述线程池是JDK中提供的ThreadPoolExecutor类。

    当然,使用线程池可以带来一系列好处:

    • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
    • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
    • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
    • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

    1.2 线程池解决的问题是什么?

    线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。

    1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
    2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
    3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

    为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

    二、线程池核心设计与实现

    2.1 总体设计

    Java中的线程池核心实现类是ThreadPoolExecutor,本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。
    ThreadPoolExecutor UML类图
    ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。
    AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
    ThreadPoolExecutor运行流程
    线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

    2.2 生命周期管理

    线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起。
    代码如下(示例):

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    

    ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
    ThreadPoolExecutor的运行状态有5种,分别为:
    在这里插入图片描述
    其生命周期转换如下入所示:
    在这里插入图片描述

    2.3 任务执行机制

    2.3.1 任务调度

    所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。

    1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
    2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
    3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
    4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
    5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

    2.3.2 任务缓冲

    任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
    阻塞队列
    使用不同的队列可以实现不一样的任务存取策略
    阻塞队列的成员

    2.3.3 任务申请

    线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现
    获取任务流程图

    2.3.4 任务拒绝

    任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
    拒绝策略是一个接口

    public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略
    在这里插入图片描述

    2.4 Worker线程管理

    2.4.1 Worker线程

    线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        final Thread thread;//Worker持有的线程
        Runnable firstTask;//初始化的任务,可以为null
    }
    

    Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建
    在这里插入图片描述
    线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。
    Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

    1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
    2. 如果正在执行任务,则不应该中断线程。
    3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
    4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

    2.4.2 Worker线程增加

    增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
    在这里插入图片描述

    2.4.3 Worker线程回收

    线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

    2.4.4 Worker线程执行任务

    在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

    1. while循环不断地通过getTask()方法获取任务。
    2. getTask()方法从阻塞队列中取任务。
    3. 如果线程池正在停止,那么要保证当前线程是中断状态。
    4. 执行任务。
    5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
      在这里插入图片描述

    三、线程池在业务中的实践

    3.1 业务背景

    场景:快速处理批量任务

    描述:离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表。
    分析:这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。

    线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大


    3.2 动态化线程池

    动态化线程池的核心设计包括以下三个方面:

    1. 简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:(1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。(2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。
    2. 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
    3. 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发了解线程池状态。

    四、线程池的相关问题

    Executor vs ExecutorService vs Executors

    • Executors 工具类的不同方法按照我们的需求创建了不同的线程池,来满足业务的需求。
    • Executor 接口对象能执行我们的线程任务。
    • ExecutorService 接口继承了 Executor 接口并进行了扩展,提供了更多的方法我们能获得任务执行的状态并且可以获取任务的返回值。

    使用 ThreadPoolExecutor 可以创建自定义线程池。 Future表示异步计算的结果,他提供了检查计算是否完成的方法,以等待计算的完成,并可以使用 get()方法获取计算的结果。

    线程的未捕获异常处理 (利用UncaughtExceptionHandler)

    实现UncaughtExceptionHandler接口,自定义一个全局处理器。不处理不行,不处理的话程序会抛出异常子线程中断,主线程不影响,会继续运行。

    public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        private String name;
        public MyUncaughtExceptionHandler(String name) {
            this.name = name;
        }
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            Logger logger = Logger.getAnonymousLogger();
            logger.log(Level.WARNING, "线程异常,终止啦" + t.getName());
            System.out.println(name + "捕获了异常" + t.getName() + "异常");
        }
    }
    
    Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler("捕获器1"));
    new Thread(new UseOwnUncaughtExceptionHandler(), "MyThread-1").start();
    
    

    死锁与活锁的区别,死锁与饥饿的区别

    • 死锁:资源独占 + 占有且等待 + 不可抢占 + 循环等待
    • 活锁:任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试,失败,尝试,失败。

    活锁和死锁的区别在于,处于活锁的实体是在不断的改变状态,所谓的“活”, 而处于死锁的实体表现为等待;活锁有可能自行解开,死锁则不能。

    • 饥饿:指一个可运行的进程尽管能继续执行,但被调度器无限期地忽视,而不能被调度执行的情况。饥饿是由资源分配策略决定的, 饥饿可以通过先来先服务等资源分配策略来避免。
    展开全文
  • 详细论述了线程池原理,以及实现过程的分析
  • Java线程池实现原理及其在美团业务中的实践 随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池:ThreadPoolExecutor...
  • java线程池实现原理

    2018-04-18 14:55:09
    出处:https://blog.csdn.net/hzw19920329/article/details/52372348问题一:线程池存在哪些状态,这些状态之间是如何进行切换的呢?查看ThreadPoolExecutor源码便知晓:[java] view plain copy// runState is ...
  • 本文设计:线程池是什么;线程池解决的是什么问题;线程池总体设计;线程池如何维护自身状态;线程池如何管理任务(任务调度、任务缓冲、任务申请、任务拒绝);...实现动态化线程池(整体设计、功能架构)等内容。
  • 线程池实现原理吐血总结

    千次阅读 多人点赞 2021-02-03 17:02:36
    要理解实现原理,必须把线程池的几个参数彻底搞懂,不要死记硬背 线程池参数 1、corePoolSize(必填):核心线程数。 2、maximumPoolSize(必填):最大线程数。 3、keepAliveTime(必填):线程空闲时长。如果超过...
  • 可以看到,Executor是顶层的一个接口,线程池实现类是ScheduledThreadPoolExecutor和ThreadPoolExecutor,而Executors是一个工具类,用于创建实际的线程池,接下来看看源码。 点进源码查看(是 JDK1.8 版本加入...
  • Java线程池实现原理及其在美团业务中的实践https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w 思考总结 关键提升在降低了线程池参数修改的成本 风险:线程池大小一般不能随意调整,不合理的调整可能带来系统间...
  • jdk在java5版本中增加了内置线程池实现ThreadPoolExecutor,本文通过ThreadPoolExecutor的源码分析jdk中线程池的实现原理。 线程池由两个核心数据结构组成: 1)线程集合(workers):存放执行任务的线程,是一个...
  • 【并发基础】线程池实现原理分析什么是线程池使用线程池的好处ThreadPoolExecutorExecutors核心参数解释工作流程自定义线程线程池注意事项使用ThreadPoolExecutor创建线程池例子如何合理配置线程池CPU密集型IO密集...
  • 线程池 队列
  • C++ 线程池实现原理

    千次阅读 2019-01-24 19:46:44
    所以写这篇文章主要是想以非常简单的方式讲讲实现原理, 希望初学者看完之后不是觉得「不明觉厉」,而是觉得「原来如此」。 面朝代码 首先先来一段超级简单(注释丰富)的代码展示多线程编程的经典写法。 注: ...
  • 线程池实现原理

    千次阅读 2020-12-06 23:33:26
    线程池实现原理 线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数超过了最大数量超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务...
  • 线程池是一种多线程处理形式,将任务添加到队列,创建线程后自动启动这些任务 降低资源消耗。线程池可以避免频繁的创建线程和销毁线程,线程池中线程可以重复使用 提高响应速度。线程池省去线程创建的这段时间 ...
  • 为什么要使用线程池 平时讨论多线程处理,大佬们必定会说使用线程池,那为什么要使用线程池?其实,这个问题可以反过来思考一下,不使用线程池会怎么样?当需要多线程并发执行任务时,只能不断的通过new Thread创建...
  • ThreadPoolExecutor线程池原理 + 源码,了解一下。
  • 深度解析Java 线程池实现原理

    千次阅读 2021-05-26 10:26:17
    本文要说的线程池就是一种对 CPU 利用的优化手段。 线程池,百度百科是这么解释的: 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 125,714
精华内容 50,285
关键字:

线程池实现原理