-
2021-12-16 16:25:27
下面三种线程池底层都是调用的ThreadpoolExecutor这个方法,ThreadpoolExecutor()构造方法包含七大参数,想要了解线程池底层原理,必须掌握这七大参数的含义
- Executors.newFixedThreadPool(int)
- Executors.newSingleThreadPool()
- Executors.newCachedThreadPool()
ThreadpoolExecutor(int corePoolSize, int maxmumPoolSize, long keepAliveTime, TimeUnit unit, BlockQueue<Runable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){}
7大参数介绍:
- corePoolSize
- 当创建线程池之后,当有任务来之后,就会安排池中的线程去执行请求任务,可以理解成今日当值窗口
- 当线程池中的线程到达corePoolSize之后,就会把到达的任务放到缓存队列中
- maxmumPoolSize
- 线程池能够容纳同时执行的最大线程数
- keepAliveTime
- 线程空闲时间
- 当线程空闲时间达到keepAliveTIme时,线程会退出,直到线程数量=corepoolSize
- unit
- keepAliveTime的时间单位
- workQueue
- 任务队列,被提交但是未被执行的队列
- threadFactory
- 表示生成线程池中的工作线程的线程工厂,用于创建线程,一般用默认的即可
- handler
- 拒绝策略
线程池原理:
- 在创建了线程池之后,等待提交过来的任务请求
- 当调用executor()方法添加一个请求任务时,线程池会做如下判断:
- 如果正在的运行的线程数小于corePoolSize,那么马上创建运行这个任务
- 如果正在运行的线程数大于或等于corePoolSize,那么将这个任务放入队列
- 如果这时候队列满了且正在运行的线程数量小于maxmumPoolSize,那么还是要创建非核心线程立刻运行这个任务
- 如果队列满了且正在运行的线程数量大于或者等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
- 当一个线程完成任务时,它会从队列取下一个任务来执行
- 当一个线程无事可做超过一定时间(keepAliveTime)时,线程池会判断:
- 如果当前运行线程数大于corePoolSIze,那么这个线程就被停掉
- 所以线程池的所有任务完成后它最终会收缩到corePoolSize大小
决绝策略:
AbortPolicy:直接抛出RejectExecutionException阻止系统正常运行
CallerRunsPolicy:“调用者运行”一种调节机制,该策略不会抛出异常也不会抛去任务,而是将某些任务回退到调用者,
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交任务。
DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种方案
如何合理配置线程池:
- CPU密集型:
- CPU密集型意思是该任务需要大量的计算,而没有阻塞,CPU一直全速运行
-
一般公式:CPU核数+一个线程的线程池
-
- CPU密集型意思是该任务需要大量的计算,而没有阻塞,CPU一直全速运行
- IO密集型
- 参考公式:CPU核数
更多相关内容 -
线程池底层原理
2021-06-03 09:49:41当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取...一、概述
java线程池大体由一个线程集合workerSet和一个阻塞队列workQueue组成。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行
public ThreadPoolExecutor(int corePoolSize,//核心线程数 int maximumPoolSize,//最大线程数 long keepAliveTime,//空闲线程存活时间 TimeUnit unit,//时间单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//线程工厂 RejectedExecutionHandler handler)//拒绝策略
1.corePoolSize:核心线程数,已创建的线程<corePoolSize,则新建一个线程执行,>corePoolSize,则放入阻塞队列workQueue中
2.maximumPoolSize:最大线程数,当阻塞队列满了,则会创建新线程,直达maximumPoolSize的值
3.keepAliveTime:当阻塞队列的任务被执行完了,且有空闲线程,使线程个数<=corePoolSize,的时间值
4.unit:keepAliveTime 的单位
5.workQueue:阻塞队列,(1)ArrayBlockingQueue基于数组的有界队列。
(2)LinkedBlockingQueue基于链表的有界队列,但是界限为int的最大值,会一直存放任务,maximumPoolSize无效 。
(3)SynchronousQueue 不缓存任务,放一个执行一个
(4)PriorityBlokingQueue 具有优先级的队列通过comparater实现
6.threadFactory:线程工厂 用老创建线程,指定名字,查日志方便。
7.handler:拒绝策略,(1)Abortpolicy:直接拒绝,抛异常(2)DiscardPolicy:忽略任务,不报错(3)DiscardOlddestPolicy:从队列移除最老的任务,放入新任务 (4)CallerRunsPolicy:如果提交任务失败,会由提交任务的这个线程自己来调用execute执行任务二、为何生产中一般不用jdk自带的线程池
1.Executors.newFixedThreadPool(int size) 创建固定线程池,用的LinkedBlockingQueue,无限接受任务,导致OOM
2.Executors.newSingleThreadExecutor() 创建一个线程的线程池,用的LinkedBlockingQueue,无限接受任务,导致OOM
3.Executors.newCachedThreadPool() 创建一个带缓存的池,最大无限大,核心线程为0,最大无限大,用的SynchronousQueue 来一个执行一个,任务越多,线程越多
4.Executors.newScheduledThreadPool(int size) 传入的参数为核心线程,最大为无限大,用的DelayedWordQueue 延迟队列三、源码解析
1.重要属性
/** * 这个ctl就是用来保存 线程池的状态(runState) 和 线程数(workerCount) 的 * 这里使用AtomicInteger 来保证原子操作 * 这里的ctl的初始值其实就是-1左移29位,即3个1和29个0, * 111 00000000000000000000000000000 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; //worker的集合,用set来存放 private final HashSet<Worker> workers = new HashSet<Worker>(); //历史达到的worker数最大值 private int largestPoolSize; //当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略 private volatile RejectedExecutionHandler handler; //超出coreSize的worker的生存时间 private volatile long keepAliveTime; //常驻worker的数量 private volatile int corePoolSize; //最大worker的数量,一般当workQueue满了才会用到这个参数 private volatile int maximumPoolSize; // COUNT_BITS值为29,代表着低29位用于存储线程数,高3位用于存储线程池的状态 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池最大的容量,值为3个 0和29个1。也就是536870911 // 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 下面这5个值代表线程池的状态,存储在高3位中 // 3个1,29个0 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 全是0 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;
2.ThreadPoolExecutor.execute()方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();// if (workerCountOf(c) < corePoolSize) {//当前正在运行的worker数量<corePoolSize if (addWorker(command, true))//创建一个worker,直接执行任务 return; c = ctl.get(); } // isRunning(c)判断线程池是否在运行中,如果线程池被关闭了就不会再接受任务 // workQueue.offer(command) 将任务加入到队列中 if (isRunning(c) && workQueue.offer(command)) { //如果添加到队列成功了,会再检查一次线程池的状态 int recheck = ctl.get(); //如果线程池关闭了,就将刚才添加的任务从队列中移除 if (! isRunning(recheck) && remove(command)) //执行拒绝策略 reject(command); // 如果线程是处于RUNNING状态,并且当前线程池中的线程数为0,开启一个新的线程 // 因为有可能任务添加到队列中了,但是却没有线程可执行 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果失败,说明当前线程数已达到maximumPoolSize,需要执行拒绝策略 else if (!addWorker(command, false))//addWorker(command, false)是新开线程执行超过核心线程的任务 reject(command); }
3.BlockingQueue中的offer(E e)方法
BlockingQueue接口提供了3个添加元素方法:
- add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常;
- offer:添加元素到队列里,添加成功返回true,添加失败返回false;
- put:添加元素到队列里,如果容量满了会阻塞直到容量不满。
已ArrayBlockingQueue为例
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e);//入队操作 return true; } } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
4.addWorker()方法
// 这个方法会创建线程并且执行任务 // 以下几种情况这个方法会返回false: // 1.传入的core这个参数为true,代表线程数的上限为corePoolSize, // 如果当前线程数已达到corePoolSize,返回false // 2.传入的core这个参数为false,代表线程数的上限为maximumPoolSize, // 如果当前线程数已达到maximumPoolSize,返回false // 3.线程池stopped或者shutdown // 4.使用ThreadFactory创建线程失败,或者ThreadFactory返回的线程为null // 5.或者线程启动出现异常 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 这个rs就是线程池的状态 int rs = runStateOf(c); // 这里的if说的是以下3种情况,直接返回false,不会创建新的线程: // 1.rs大于SHUTDOWN,说明线程状态是STOP,TIDYING, 或者TERMINATED, // 这几种状态下,不接受新的任务,并且会中断正在执行的任务。所以直接返回false // 2.线程池状态处于SHUTDOWN,并且firstTask!=null。 // 因为SHUTDOWN状态下,是不接收新的任务的。所以返回false。 // 3.线程池处于SHUTDOWN并且firstTask为null,但是workQueue是空的。 // 因为SHUTDOWN虽然不接收新的任务,但是已经进入workQueue的任务还是要执行的, // 恰巧workQueue中没有任务。所以也是返回false,不需要创建线程 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 注意:这里是个for循环 // 获取线程池中线程的数量 int wc = workerCountOf(c); // 这里传入的core为true代表线程数上限为corePoolSize, // false代表线程数上限为maximumPoolSize,如果线程数超出上限,直接返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 使用CAS对线程计数+1,如果成功,说明已经满足创建线程的条件了 if (compareAndIncrementWorkerCount(c)) break retry; // 如果上面的CAS失败,说明有并发,再次获取ctl的值 c = ctl.get(); // Re-read ctl // 如果线程池的状态发生了变化,例如线程池已经关闭了, // 导致的CAS失败,那么回到外层的for循环(retry) // 否则,说明是正常的CAS失败,这个时候进入里面的循环 if (runStateOf(c) != rs) continue retry; } } // 已经做好创建线程的准备了 // worker是否已经启动的标志位 boolean workerStarted = false; // 我们前面说了workers这个HashSet用于存储线程池中的所有线程, // 所以这个变量是代表当前worker是否已经存放到workers这个HashSet中 boolean workerAdded = false; Worker w = null; try { // 传入firstTask这个任务构造一个Worker w = new Worker(firstTask); // Worker的构造方法中会使用ThreadFactory创建新的线程, // 所以这里可以直接获取到对应的线程 final Thread t = w.thread; // 如果创建线程成功 if (t != null) { final ReentrantLock mainLock = this.mainLock; // 获取线程池的全局锁,下面涉及线程池的操作都需要在持有全局锁的前提下进行 mainLock.lock(); try { // 获取线程池的状态 int rs = runStateOf(ctl.get()); // 如果rs<SHUTDOWN,说明线程池处于RUNNING状态 // 或者 线程池处于SHUTDOWN状态并且没有新的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果线程已经启动,抛出异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将包装线程的worker加入到workers这个HashSet中 workers.add(w); int s = workers.size(); // 我们前面说了,largestPoolSize记录的是线程池中线程数曾经到达的最大值 // 线程池中worker的数量是会变化的,所以记录下worker数的最大值 if (s > largestPoolSize) largestPoolSize = s; // 修改标志,代表当前worker已经加入到workers这个HashSet中 workerAdded = true; } } finally { // 释放全局锁 mainLock.unlock(); } // 如果worker添加成功,启动线程执行任务 if (workerAdded) { // 启动线程 t.start(); // 代表worker已经启动 workerStarted = true; } } } finally { // 如果线程没有启动,这里还需要进行一些清理工作 if (! workerStarted) addWorkerFailed(w); } // 返回线程是否成功启动 return workerStarted; } // 这个方法做下面几件事: // 1.将worker从workers中移除 // 2.worker的数量-1 // 3.检查termination private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 要操作workers这个HashSet,先获取java线程池全局锁 mainLock.lock(); try { if (w != null) // 从worker中移除 workers.remove(w); // WorkerCount -1 decrementWorkerCount(); // 处理TERMINATED状态 tryTerminate(); } finally { mainLock.unlock(); } }
5.启动线程执行任务的操作就是在addWorker中,t.start,调用Worker.run()
public void run() { // 这里调用的runWorker方法 runWorker(this); } // 这里就是执行任务的代码了,有一个while循环不断从队列中取出任务并执行, // 退出循环的条件是获取不到要执行的任务 final void runWorker(Worker w) { // 当前线程 Thread wt = Thread.currentThread(); // 前面说了new Worker的时候可以指定firstTask,代表Worker的第一个任务 Runnable task = w.firstTask; // 这一步就已经将firstTask置为null了 w.firstTask = null; // 释放Worker的独占锁,这里它释放锁的操作一定会成功,也就是将AQS中state设置为0 w.unlock(); // allow interrupts // completedAbruptly这个标志位代表当前Worker是否因为执行任务出现异常而停止的 boolean completedAbruptly = true; try { // while循环;如果firstTask不为null那就直接执行firstTask, // 否则就要调用getTask()从队列中获取队列。 // 也就是说Worker的第一个任务是不需要从队列中获取的 while (task != null || (task = getTask()) != null) { // 给这个worker上独占锁 // Worker加锁的意义在于,在线程池的其他方法中可能会中断Worker, // 为了保证Worker安全的完成任务,必须要在获取到锁的情况下才能中断Worker, // 如tryTerminate(),shutdown()等都会关闭worker。 w.lock(); // 如果ctl的值大于等于STOP,说明线程池的状态是STOP,TIDYING或TERMINATED。 // 这个时候需要确保该线程已中断,否则就应该确保线程没有中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 其实这里只是设置线程的中断状态 wt.interrupt(); try { // 这个方法留给子类去实现的 beforeExecute(wt, task); Throwable thrown = null; try { // 真正执行任务的地方 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 后置处理,留给子类去实现的 afterExecute(task, thrown); } } finally { // 最后将task置为null,准备接受下一个任务 task = null; // 这个worker已完成任务数+1 w.completedTasks++; // 释放独占锁 w.unlock(); } } // 到这一步说明没抛出异常 completedAbruptly = false; } finally { // 执行到这里说明:要么队列中已经没有任务了,要么执行任务出现了异常。 // 这个时候需要调用processWorkerExit关闭线程 processWorkerExit(w, completedAbruptly); } }
6.从队列中获取任务getTask()
// 这个方法就是从队列中获取任务,返回null代表线程需要被关闭。一共有以下三种可能: // 1.阻塞获取任务直到获取成功 // 2.获取任务超时了,也就说线程空闲了keepAliveTime这么久了,还是没有获取到任务, // 这个时候线程需要被关闭(这里有个前提就是线程数要大于corePoolSize) // 3.如果出现下面几种情况返回null,返回null说明线程需要被关闭 // 池中worker的数量大于maximumPoolSize(由于调用setMaximumPoolSize进行了设置) // 线程池处于STOP状态,这个时候不能执行任务队列中的任务 // 线程池处于SHUTDOWN状态,但是任务队列是空的 private Runnable getTask() { boolean timedOut = false; // 最后一次的poll操作是否超时 for (;;) {// for循环 int c = ctl.get(); // 获取线程池的状态 int rs = runStateOf(c); // 如果线程池的状态大于等于STOP,或者线程池状态等于SHUTDOWN并且任务队列为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 使用CAS对workerCount-1 decrementWorkerCount(); // 返回null return null; } // 获取线程池中的线程数 int wc = workerCountOf(c); // 如果allowCoreThreadTimeOut设置为true, // 或者线程池中线程数>corePoolSize,说明有可能发生超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果当前线程数大于maximumPoolSize,或者超时 // 注意:如果开发者调用了setMaximumPoolSize() 将maximumPoolSize变小了, // 就有可能出现当前线程数大于maximumPoolSize。 // 这个时候多余的线程肯定是获取不到任务的,需要被关闭 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // workerCount-1 if (compareAndDecrementWorkerCount(c)) return null; continue; } // 到这里,说明线程数小于maximumPoolSize等于且没有超时 try { // 从任务队列中取出任务 // 如果timed为true,调用带超时的poll方法,否则执行take方法阻塞获取任务。 // timed这个变量的值取决于allowCoreThreadTimeOut || wc > corePoolSize // 其实这里说的是,如果线程池中线程数量在corePoolSize以内, // 且不支持回收核心线程数内的线程,这个时候线程池中的线程是不会被回收的。 // 所以调用take方法阻塞获取任务,直到获取成功。 // 否则的话,线程隔了keepAliveTime这么久还是获取不到任务,是需要被回收的 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 如果成功获取到任务,返回这个runnable任务, // 否则就是超时了,再进入下一轮循环的时候返回null if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
7.阻塞队列中的workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 或者 workQueue.take();方法,移除队列头部元素,通过ReentrantLock 加锁await(),signal()来阻塞唤醒线程
BlockingQueue接口提供了3个删除方法:
- poll:删除队列头部元素,如果队列为空,返回null。否则返回元素;
- remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false;
- take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除。
以ArrayBlockingQueue为例
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0)//队列中没有元素时调用condition.await阻塞 notEmpty.await(); return dequeue();//出队操作 } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos);//队列中没有元素时,或者超时,调用condition.await阻塞 } return dequeue();//出队操作 } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
四、总结
线程池内部工作原理:
- 首先,调用execute()执行方法时,先判断线程池中工作线程的数量小于corePoolSize,则调用addWorker()方法,在addWorker()方法里创建新线程(判断当前线程池的状态,shundown,stop等不执行,返回false)
- 创建线程成功,ReentrantLock加锁,放入HashSet 中 解锁
t.start()执行线程。如果大于corePoolSize,则调用workQueue.offer(command)放入阻塞队列中,阻塞队列采用ReentrantLock加锁保证线程安全 - 如果阻塞队列满了,调用addWorker(command, false)方法,新建线程加入HashSet
中,如果>maximumPoolSize !addWorker(command, false)
true调用reject(command) 执行拒绝策略方法。 - addWorker()中线程创建好了,t.start()执行任务 就是调用runWorker(Worker w)方法,Runnable
task = w.firstTask;第一个任务直接执行,后续的调用getTask()从队列中for(;;)调用
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)或者
workQueue.take()从阻塞队列头部获取任务,方法内部ReentrantLock加锁保证线程安全
五、问题
-
线程执行任务过程中出现异常是怎么处理的?
如果一个线程执行任务出现异常,那么执行任务的线程会被关闭,而不会继续执行其他任务。最后会启动一个新的线程来取代它
-
线程池是怎么实现线程复用的?
runWorker()方法中,一个线程执行完一个任务后会不断从任务队列中取出任务来执行,如果队列中已经没有任务了,allowCoreThreadTimeOut设置为false并且线程数<=corePoolSize,调用BlokingQueue.take()方法阻塞,直到获取到任务
如果队列中没有任务了,allowCoreThreadTimeOut设置为true或者线程数>corePoolSize,调用BlockingQueue带超时的poll方法尝试获取任务,获取不到的话,这个线程就会被回收掉 -
shutdown() 和 shutdownNow()有什么区别?
线程在拿到任务的时候开始执行的时候,是会获取Worker的独占锁的。shutdown()方法中断worker会先调用Worker.tryLock()获取独占锁,如果线程正在执行任务,那就获取不到独占锁,也就无法中断线程。而shutdownNow()方法是直接尝试中断所有线程,它们底层都是调用Thread.interrupt()方法给线程设置interrupt标记,所以只有响应中断的任务在interrupt()以后才会终止
1.当创建线程池后,初始时,线程池处于RUNNING状态,此时线程池中的任务为0;
2.如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
3.如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
4.当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。
5.线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED,线程池被设置为TERMINATED状态。
-
java线程池底层原理(ThreadPoolExecutor)
2021-07-05 18:47:44通过分析ThreadPoolExecutor了解更多线程池底层实现原理。线程池的存在就是为了合理的利用线程,减少不必要的线程频繁创建销毁,提升程序性能。
在最初接触线程池的时候,我们都知道Executors工具类,可以提供创建不同类型的线程池,包括提供单个线程的线程池、固定数量线程的线程池以及缓存线程池等;
public static ExecutorService newFixedThreadPool(int nThreads){} public static ExecutorService newSingleThreadExecutor(){} public static ExecutorService newCachedThreadPool(){}
而这些工具创建线程池,最终都是创建了一个ThreadPoolExecutor对象,该对象提供了多个重载的构造器,通过搭配不同参数实现上述的线程池需求。
ThreadPoolExecutor对象虽然提供了多个重载的构造器,但最终都是在调用同一个构造器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
其中各个参数的意义:
corePoolSize:核心线程数,当线程池待执行任务队列没满时,线程池最多提供的线程数;
maximumPoolSize:最大线程数,当线程池待执行任务队列已满时,会尝试创建更多线程来处理任务,线程总数上限就是maximumPoolSize;
keepAliveTime:如果线程池中的线程数超过了核心线程数,则如果有空闲时间超过keepAliveTime的线程,就会被回收;
unit:keepAliveTime的时间单位;
workQueue:待执行的任务队列,当线程池繁忙且无法创建更多核心线程来处理任务时,任务会被添加到待执行队列,排队等候被执行;
threadFactory:线程工厂,默认是Executors工具类中的静态内部类DefaultThreadFactory,它实现了ThreadFactory接口,该接口只定义了一个方法:
Thread newThread(Runnable r);
DefaultThreadFactory实现代码如下,总结起来就是统一创建同一优先级的非守护线程:
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
handler:拒绝策略,当线程池没有额外的线程处理新的任务,且待执行队列也满了的时候,或者干脆线程池已经被关闭,则新的任务就只能被拒绝,RejectedExecutionHandler是一个接口,该接口定义了如下方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
ThreadPoolExecutor类中提供了几个默认的实现了RejectedExecutionHandler的静态内部类,每个拒绝策略都有自己的特点,具体包括如下几种拒绝策略:
AbortPolicy:简单粗暴型,直接叫你滚,抛出异常,这也是默认的拒绝策略:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
CallerRunsPolicy:兢兢业业型,你没空做,那我自己做:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
DiscardPolicy:高冷型,完全不搭理你:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
DiscardOldestPolicy:喜新厌旧型,丢弃掉待执行队列中最老的任务,添加新的任务:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
如果以上几种类型你都不喜欢,那么你可以自己实现RejectedExecutionHandler接口,口味自调。
清楚了以上的参数,那使用线程池就很简单了,一个很简单的例子如下:
public class ThreadPoolTest { private static ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) { pool.execute(()->{ System.out.println("task execute"); }); pool.shutdown(); } }
关于上面这个简单的案例,我们分三步看:
1、线程池的创建,这个就是利用ThreadPoolExecutor的构造器实例化对象,参数在上面都已经详细介绍了,此处没什么再可说的了;
2、execute提交任务,总体逻辑如下图所示:
3、shutdown,关闭线程池,说到这里,要说下线程池的几个状态了
RUNNING:线程池创建后即为该状态,可以执行队列中的任务也可以接收并处理新的任务;
SHUTDOWN:当调用shutdown()方法后,线程池进入该状态,此状态下线程池不再接收新的任务,如果继续提交任务,则执行拒绝策略,但是会继续执行队列中的任务;
STOP:当调用shutdownNow()方法后,线程池进入该状态,此状态下线程池不再接收新的任务,也不再处理队列中的任务,而且会尝试中断执行中的任务;
TIDYING:当线程池中所有任务都结束,且线程也全部被回收,则进入TIDYING状态;
TERMINATED:terminated()方法执行后,线程池彻底终结;
线程池中的线程是如何启动并回收的呢?
线程池中的每个线程是对应一个Worker实例,在创建完后,会循环处理提交的任务,在上文中的图中有所体现:
Worker的run()方法调用了ThreadPoolExecutor的runWoker方法:
public void run() { runWorker(this); }
runWorker源码如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker中用了个while循环处理任务,只要while循环不退出,则当前Worker线程就一直运行,从源码中可以看出,退出条件是task为null,也就是getTask()这个方法是控制Worker生命周期的关键方法,其源码如下:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
1、正常情况下,会采用阻塞的方式(即take()方法)从待执行队列获取任务,即便被interrupt,也会再次循环阻塞获取,这样必然能保证返回可执行的任务,则worker线程不会被回收;
2、线程池如果进入SHUTDOWN状态,且待执行队列中的数据也被消费完了,则返回null,这样就保证了所有的worker线程会结束运行,从而被回收;
3、线程池如果进入了STOP状态,则不管待执行队列有没有任务,都会立即返回null,则worker线程结束;
4、如果当前worker线程总数超过了指定的核心线程数,则从队列获取数据时,使用超时阻塞的方式(即poll(long timeout, TimeUnit unit)方法)获取,超时时间即keepAliveTime,超时后如果没获取到任务,则线程空闲时间达到阈值,返回null,结束worker线程;
5、如果配置了了核心线程数也需要超时回收,则获取任务时同样采取的是超时阻塞方式,同第4点;
-
并发编程五:java并发线程池底层原理详解和源码分析
2022-02-15 17:55:09java并发线程池底层原理详解和源码分析 上篇分析了java线程。现在来分析java线程池。在分析线程池之前,先来思考下我们的线程是创建的越多越好吗,显然不是,我们为什么要使用线程池,用下面的例子来看下 /*** * ...文章目录
java并发线程池底层原理详解和源码分析
线程和线程池性能对比
上篇分析了java线程。现在来分析java线程池。在分析线程池之前,先来思考下我们的线程是创建的越多越好吗,显然不是,我们为什么要使用线程池,用下面的例子来看下
/*** * 使用线程的方式去执行程序 */ public class ThreadTest { public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); final Random random = new Random(); final List<Integer> list = new ArrayList<Integer>(); for (int i = 0; i < 100000; i++) { Thread thread = new Thread() { @Override public void run() { list.add(random.nextInt()); } }; thread.start(); thread.join(); } System.out.println("时间:" + (System.currentTimeMillis() - start)); System.out.println("大小:" + list.size()); } }
/*** * 线程池执行 */ public class ThreadPoolTest { public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); final Random random = new Random(); final List<Integer> list = new ArrayList<Integer>(); ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 100000; i++) { executorService.execute(new Runnable() { @Override public void run() { list.add(random.nextInt()); } }); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("时间:"+(System.currentTimeMillis() - start)); System.out.println("大小:"+list.size()); } }
上面两份代码都是执行十万次把随机数添加到集合,不同的是第一份代码每次循环都要创建线程执行任务,第二份代码通过线程池的方式执行任务。那个会比较快。答案是线程池的要快很多。
为什么,在上篇有说到java创建线程是重量级的,涉及到从用户态到内核态,同时过多的线程导致cpu不断的上下文切换。
在第一份代码中创建了10万个对象,创建了10万零一个线程。第二代码中同样创建了十万个对象,但只创建了两个线程。为什么说只创建了两个线程呢,后面会分析这个newSingleThreadExecutor。那么线程池性能就一定好吗?看看下面的例子
public class ThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService1 = Executors.newCachedThreadPool();//快 ExecutorService executorService2 = Executors.newFixedThreadPool(10);//慢 ExecutorService executorService3 = Executors.newSingleThreadExecutor();//最慢 for (int i = 1; i <= 100; i++) { executorService1.execute(new MyTask(i)); } } } /*** * 项目 */ class MyTask implements Runnable { int i = 0; public MyTask(int i) { this.i = i; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "程序员做第" + i + "个项目"); try { Thread.sleep(3000L);//业务逻辑 } catch (Exception e) { e.printStackTrace(); } } }
上述代码很简单,就是不同的线程池执行MyTask 的run方法,这个run方法就输出一句话,然后阻塞三秒。那个线程池会快呢,答案已经在注释上面了。
在不同的场景使用不同的线程得到的性能是不一样的。Executors创建的三种线程池分析
平时有关注阿里巴巴开发手册的话应该知道,不推荐使用Executors创建的三种线程池。那么为什么不推荐,下面来一个个分析。
先来看下线程池几个参数的含义
corePoolsize
:核心线程数量,创建线程池时就含有的线程数量
maximumPoolSize
最大的线程数量,当核心线程数量不够时,最多可以创建多少个线程(包含核心线程)
keepAliveTime和 TimeUnit
非核心线程的存活时间长度和时间单位
BlockingQueue<Runnable>
队列(队列会在后续博客中单独分析)
ThreadFactory
线程工厂,在上篇有说过
RejectedExecutionHandler
拒绝策略newCachedThreadPool分析
ExecutorService executorService1 = Executors.newCachedThreadPool()
newCachedThreadPool
的核心线程数为0,最大线程数为最大值,线程池里面的线程存活时间是60秒。队列采用的是SynchronousQueue
同步队列。
根据上面的案例来分析下,
首先newCachedThreadPool没有核心线程,所以当接收到任务时会放入到同步队列中(同步队列是典型的生产和消费模式,当同步队列中有任务,必须要先消费这个任务才能接收其他任务
),此时会创建线程1去执行任务1。那么同步队列的任务被消费了,就能接收第二个任务,同样会创建线程2去执行任务2。那么又会接收任务3,此时如果线程1执行完了任务1的话,并且空闲时间在60秒内(存活时间是60秒),那么任务3会分配给线程1,此时线程1执行了任务1和任务3。这是线程复用。如果接任务3的时候没有空闲线程,那么就会创建线程3来执行,这就是newCachedThreadPool线程池的流程。newFixedThreadPool分析
ExecutorService executorService2 = Executors.newFixedThreadPool(10)
newFixedThreadPool
的核心线程数是10,最大线程数是10,非核心线程存活时间为0,最大线程数和核心线程数相等,也没办法创建其他非核心线程了。队列是LinkedBlockingQueue
无界队列,先简单说是可以无限存储数据。队列是一个数据结构,有着FIFO,就是先进先出。根据上面案例分析:
首先newFixedThreadPool有10个核心线程,那么一开始就可以接收10个任务,这10个任务不需要放入队列。从第11个任务开始就会放入到队列中,每个核心线程任务执行完后,会从队列中获取任务。
这也就是上面案例使用newFixedThreadPool执行的时候为什么是10个10个来打印语句的原因。
如果说ExecutorService executorService2 = Executors.newFixedThreadPool(100)
的话,那么上面案例执行效率会和newCachedThreadPool
是一样的。newSingleThreadExecutor分析
ExecutorService executorService3 = Executors.newSingleThreadExecutor()
newSingleThreadExecutor
和newFixedThreadPool
差不多,只不过核心线程和最大线程都是1,当接收到任务是核心线程会执行任务1,那么任务2开始到任务100都会放入到队列中,等待核心线程执行。因此这个在上述案例中时执行效率最慢的。根据上面的分析,不同的任务不同场景就要使用不同的线程池参数。那为什么阿里巴巴的开发手册不推荐使用这三种线程池。
首先最根本的原因是开发者不一定知道线程池参数的含义,或者说开发者根本不知道newFixedThreadPool、newFixedThreadPool、newCachedThreadPool底层这些参数,只是单纯的想创建一个线程池,那么这种时候就会出现问题。刚才也分析,在不同场景不同的业务使用到的线程池参数是不同,使用不当就会造成性能下降。其次呢,这三个线程池有着不同的问题。例如
newSingleThreadExecutor
和newFixedThreadPool
底层使用的队列是LinkedBlockingQueue
无界队列。
可以看到LinkedBlockingQueue
容量的最大值是Integer的最大值,就是说当任务过多的时候有可能导致OOM
。不过LinkedBlockingQueue
是可以指定大小的,但是newSingleThreadExecutor
和newFixedThreadPool
的底层都没有指定大小。因此这两个线程池有可能倒是OOM,就算没有导致OOM,容量过大也会导致频繁GC。
对于newCachedThreadPool
使用的是SynchronousQueue
,这是同步队列,不会导致OOM,但是它的最大线程数是多少,是Integer.MAX_VALUE
。如果无限的创建线程会导致什么问题,CPU100%。
所以三种线程池都各自有各自的问题。不过对于中小项目来说,由于量不够,不会导致这些问题。使用也是没有关系的。具体情况具体分析。
那么不推荐使用上面的三种线程池,那用什么线程池呢?推荐使用自定义线程池自定义线程池分析
在上述案例中,上面三种线程好像都不是很满意,要么创建的线程数太多了,要么队列空间太大了,要么线程数太少了。这时候使用自定义线程池。
public class ThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService1 = Executors.newCachedThreadPool();//快 ExecutorService executorService2 = Executors.newFixedThreadPool(10);//慢 ExecutorService executorService3 = Executors.newSingleThreadExecutor();//最慢 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10)); for (int i = 1; i <= 100; i++) { poolExecutor.execute(new MyTask(i)); } } }
看看执行结果
这里抛出的异常是因为队列的长度不够,而且是在第31个任务的时候抛出的异常,为什么会在第31个任务抛出异常?还有从结果上来看还有一个问题就是顺序的问题。从上图来看执行任务是从1-10,然后20-30,在到11-20。我们假想的执行顺序是1-10,10-20,20-30。为什么会出现这种情况呢?
这里涉及到两个知识点:提交优先级、执行优先级。这些在源码当中都有体现。
现在开始分析源码来解答上述出现的问题。线程池源码分析
继承关系
先来看下类继承关系图:
Executor只有一个execute的空方法
ExecutorService继承了Executor 同时提供了submit接口。ExecutorService是一个接口所以下面这些方法都是空方法。
经常会问submit方法和execute方法有什么不同,后面分析的时候也会说到。
AbstractExecutorService是一个抽象类,实现了ExecutorService方法并且实现了submit方法
线程池ThreadPoolExecutor继承了AbstractExecutorService。源码分析的核心。重写了execute方法。ThreadPoolExecutor源码分析
AbstractExecutorService实现了ExecutorService方法并且实现了submit方法
submit底层调用了execute方法。所以execute和submit有什么区别。submit底层调用execute,第一个区别就是execute是没有返回值的,submit是有返回值的。当我们使用线程池执行任务时候
poolExecutor.execute(new MyTask(i));
调用了execute方法来执行任务。之前说过有提交优先级和执行优先级。虽然execute翻译过来是执行的意思,但是execute的源码是提交优先级
,在执行前得先提交任务。
现在来看看execute方法public void execute(Runnable command) { //判断Runnable 是否为空,为空抛空指针异常 if (command == null) throw new NullPointerException(); //ctl是一个CAS操作AtomicInteger类型 int c = ctl.get(); //判断当前的线程数是否小于corePoolSize 如果是通过addWord方法创建一个新的线程 //如果能完成新线程创建exexute方法结束,成功提交任务; if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //isRunning(c) 判断线程的状态, //workQueue.offer 类似队列的一个add方法 把任务添加到队列中 //这个if判断状态如果是运行状态,并且能够把任务添加到队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //这里做了双重检测 //如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了) if (! isRunning(recheck) && remove(command)) //调用拒绝策略 reject(command); //如果双重检测成功了 并且线程池中的数量==0 else if (workerCountOf(recheck) == 0) //这里为什么是null 在addWorker源码的时候在分析 // 先理解为创建了一个线程给了一个空任务 addWorker(null, false); }else if (!addWorker(command, false)) //这里的addWorker(command, false) 意思是创建非核心线程 如果创建失败了返回false //当addWorker返回false时 调用拒绝策略 reject(command); }
源码中的ctl是ThreadPoolExecutor定义的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
workQueue(队列)的offer方法和add方法什么不同?workQueue的offer和add方法其实调用的是AbstractQueue的offer和add
add方法底层也是调用了offer方法,如果队列满了的话add会抛出IllegalStateException异常,而offer只会返回false。并且offer会抛出三种异常,add会抛出四种异常,多出来的就是IllegalStateException异常。上述的execute的源码流程在注释中写有,现在通过上面的案例,再梳理一遍流程。在自定义线程池分析的案例中,通过线程池循环100次执行任务。整个过程是这样的:
- 当第一个任务进来就是第一次执行execute的时候,判断任务是否为空,不为空,判断当前线程数是否小于核心线程数,案例中定义的核心线程数是10。当前没有线程那么肯定小于10,那么通过addwork方法创建了一个线程,此时线程是核心线程,并且创建成功后return 返回。这个线程认为是从0开始的计数的,小于10也是10个核心线程。所以1-10的任务就交给了10个核心线程。
- 然后第11个任务进来了。此时核心线程不小于10了。判断线程状态是否是运行状态(默认就是运行状态),然后通过offer方法添加到队列中,成功以后此时第11个任务就放到了队列中,然后进行二次判断,再次判断线程运行状态,如果不是运行状态并且能够把该任务从队列中移除掉则调用拒绝策略。如果二次判断成功并且线程池的数量等于0那么调用addwork方法传入空参数。由于案例中队列的容量是10所以从11-20的任务都能添加到队列中,并且由于创建了核心线程所以线程池的数量此时不等于0,也就不会执行addwork(null,false);方法
- 然后第21个任务进来。此时队列已经满了offer方法返回false,因此执行了else if的方法通过addwork创建非核心线程,如果创建成功那么第21个任务就交给了非核心线程;如果创建失败了,调用拒绝策略。由于案例中最大线程数是20,除去核心线程10个还能创建10个非核心线程数。所以21到30的任务就交给了非核心线程。
- 当第31个任务进来,核心线程满了,队列也满了offer放回false 执行else if中的addwork方法,由于最大线程数是20,此时已经有20个了没法在创建线层。则调用了拒绝策略抛出了异常。
这就是自定义线程池案例中为什么是第31个任务抛出的异常。同时核心线程执行1-10的任务,非核心线程执行21-30的任务。只有当执行完任务的线程才能够从队列中获取任务执行,队列中的任务是11-20,这既是为什么先执行1-10的任务,在执行21-30的任务,最后才执行11-20的任务的原因。
下面就来分析下addWorker的源码
private boolean addWorker(Runnable firstTask, boolean core) { retry: //使用了java不推荐的goto语句 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. //判断如果线程非运行,并且非SHUTDOWN状态下任务为空,队列非空就不能再增加线程 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取当前线程数 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //如果当线程数大于最大线程数 不增加线程 return false; //compareAndIncrementWorkerCount(c) ctl+1 工作线程数+1 如果成功 //这里只是线程数+1,并没有真正创建新线程,创建工作在后面 if (compareAndIncrementWorkerCount(c)) //跳出循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建一个worker对象 可以暂时理解为这一个线程 w = new Worker(firstTask); //从worker对象中获取线程 final Thread t = w.thread; if (t != null) { //加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //获取线程状态 int rs = runStateOf(ctl.get()); //小于shutdown就是running状态 //或者SHUTDOWN 和firstTask 为空是从队列中处理任务 那就可以放到集合中 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //判断线程还没start 就alive就直接异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //一切正常的话就把worker加到workers中 它是HashSet集合 workers.add(w); //获取当前worker的数量 int s = workers.size(); if (s > largestPoolSize) //记录worker的数量 相当于记录线程的数量 largestPoolSize = s; //标志线程添加成功 workerAdded = true; } } finally { mainLock.unlock(); } //如果线程添加成功 if (workerAdded) { //执行线程start方法 t.start(); //标志线程开始执行 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker大致流程上面有注释,在进一步分析之前,先来解决一些问题。首先这个Worker是什么。看下worker的源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //线程 final Thread thread; // 第一个任务 Runnable firstTask; //执行了多少个任务 volatile long completedTasks; //有参构造 这里创建了线程 并设置任务 Worker(Runnable firstTask) { setState(-1); // 初始化的过程中不允许中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //实现 Runnable 的run 方法 public void run() { runWorker(this); } }
以上就是Worker 比较重要的源码展示。
再来解决一个问题。在addWorker和execute方法中都有int c = ctl.get();这个c有时候既表示了线程状态,又表示线程数。这是怎么做到。这里做一个简单的讲解。看下线程池ThreadPoolExecutor的定义private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 高三位表示线程的状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 通过ctl的高三位获取到线程状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //通过ctl的其他29位 获取到线程的个数 private static int workerCountOf(int c) { return c & CAPACITY; } //计算ctl的个数 private static int ctlOf(int rs, int wc) { return rs | wc; }
其中COUNT_BITS是 int 位数 private static final int COUNT_BITS = Integer.SIZE - 3; //Integer.SIZE=32 所以实际 COUNT_BITS = 29, 用上面的5个常量表示线程池的状态,实际上是使用32位中的高3位表示
解决这两个问题后,现在来通过上面的案例来案分析addwork方法。
- 当第任务进来执行了execute方法,继而调用了addWorker(command, true)方法。进行第一个for循环的判断,判断线程状态,队列非空等等;通过以后进行第二个for循环的判断,判断当前线程数有没有大于最大线程数,并且再次判断线程状态。都通过以后ctl+1,然后退出循环。通过创建worker对象创建了线程并且赋予了任务(具体看上面的worker源码)。然后上锁,再次判断线程状态,如果是运行状态或者shutdown状态并且任务为空,那么就添加到workers这个HashSet集合。记录此时的线程数。并且调用start方法。
- 那么当第1个任务进来执行了addWorker(command, true);通过第一个for循环的判断,此时没有线程所以也能通过第二个for循环的判断。开始创建worker对象,通过worker的构造方法把第一个任务提交给worker同时创建了线程。然后又经过判断最后添加到workers。前十个任务就对应是个核心线程
- 第11个任务进来在execute方法中放入队列不会执行addWorker,因此不会创建线程。所以11-20的任务就会在队列中
- 21个任务进来同理执行了addWorker(command, false);方法同理创建了线程
在addwork中最后执行了start();方法,由于Worker实现了Runnable 接口,其实start();方法就是调用了Worker的run方法。这个run方法有调用了runWorker(this); 所以还要看runWorker的源码
final void runWorker(Worker w) { Thread wt = Thread.currentThread();//获取当前线程 Runnable task = w.firstTask;//获取worker中的任务 w.firstTask = null;//将worker的任务设为null w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果任务为不是null 执行这个任务,如果任务是null从队列中获取任务 while (task != null || (task = getTask()) != null) { w.lock(); //判断线程状态如果是stop 就立即中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//任务执行前置方法 这是空方法 Throwable thrown = null; try { task.run();//开始执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//任务执行后置方法 这是空方法 } } finally { task = null; //将任务设为null 下次就从队列中获取 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //这里是线程复用 processWorkerExit(w, completedAbruptly); } }
runWorker的源码意思很简单,获取当前线程和任务,然后执行,不论如何都会将任务设为null,为了下次从队列中获取任务。重点是
(task != null || (task = getTask()) != null)
之前说过,提交优先级和执行优先级。execute代码阐述了提交优先级,那么这一行代码就是执行优先级,先判断worker对象有没有任务,有就执行;没有通过getTask();从队列中获取对象。
还有一个知识点就是beforeExecute和afterExecute这两个是空方法,如果有必要可以自己实现。
getTask();方法的源码就不分析了,就是从队列中获取任务。然后来分析下processWorkerExit的源码private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //重点删除掉worker workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //重点 重新创建了worker对象而且任务为null addWorker(null, false); } }
processWorkerExit 的代码是用来实现线程复用,在源码中显示remove执行完成的worker,然后addWorker(null, false);那么创建出来的worker 就会从队列中获取到任务。
到这里线程池的源码就分析结束。拒绝策略
ThreadPoolExecutor内部有实现4个拒绝策略:
- CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;
- AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;
- DiscardPolicy,直接抛弃任务,不做任何处理;
- DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交;
一般来说通常不会使用到内部提供的拒绝策略,而是自己实现拒绝策略,然后做业务处理,比如说记录数据库,记录日志等等。怎么使用呢?ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 5, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("业务处理,记录日志"); } });
线程池流程图
-
线程池底层原理你真的懂吗?
2021-03-30 17:18:37与线程池相关的接口实现类关系 Exectutor是接口 Executors 是辅助工具类 ...线程池的底层原理及七大参数的意义 七大参数 public ThreadPoolExecutor( int corePoolSize,//核心线程数 int maximumPoolSize,//最大线程 -
java 线程池底层原理
2017-09-22 17:18:46newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 源码如下:public static ExecutorService newCachedThreadPool() { return new ... -
(四)深入理解线程池底层原理
2018-08-10 16:37:11如何理解线程池的工作机制和原理? (1)线程池是用来干嘛的,用它有什么好处,怎么能更好的去用线程池? 线程池是用来干嘛的? 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样... -
深入解析线程池底层原理
2021-09-09 21:47:47本期带来线程池的第二期内容,如果对线程池的基本概念还不是很清楚,可以先看我上一篇文章。 面试官:谈谈你对线程池的理解 本期内容会从以下几个方面解析线程池的具体实现: 线程池状态 线程池初始化 如何执行... -
线程池底层原理《二》
2021-03-06 18:07:49看一遍就能快速理解的线程池底层原理 -
想深入了解线程池底层原理?那就动手写一个吧!
2019-11-17 13:16:22扫描下方海报试读本文来源:crossoverJie前言原以为线程池还挺简单(平时常用,也分析过原理),这次是想自己动手写一个线程池来更加深入的了解它;但在动手写的过程... -
线程池的底层原理
2021-12-27 17:55:57使用线程池进行线程复用 线程池 提前创建一系列的线程,保存在这个线程池中。(核心线程) 有任务要执行的时候,从线程池中取出线程来执行。 没有任务的时候,线程池放回去。 Java中提供的线程池 Executors 1,... -
深入理解线程池底层原理
2019-12-12 14:16:34如何理解线程池的工作机制和原理?1、线程池是用来干嘛的,用它有什么好处,怎么能更好的去用线程池?线程池是用来干嘛的?如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束... -
线程池底层实现原理相关面试题-(面试必问)
2022-03-02 21:23:19线程池底层实现原理相关面试题-(面试必问) -
线程池底层原理介绍
2021-02-21 23:15:21线程池底层原理介绍 池化思想 优点 提高线程利用率 提高响应速度 可以控制最大并发数 便于统一管理 参数 参数 解释 corePoolSize 核心线程数量,线程池维护线程的最少数量 maximumPoolSize 线程池维护...