-
2019-12-31 10:16:38
概述:
在编程规范中,不建议使用Executors去创建线程池,而是推荐使用ThreadPoolExecutor。
ThreadPoolExecutor会更明确运行规则,避免资源耗尽的风险。
因为Executors返回线程池有弊端:
1)FixedThreadPool和SingleThreadPool,允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool,允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
ThreadPoolExecutor:
public ThreadPoolExecutor ( int corePoolsize, int maximumPoolSize, 1ong keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 函数的参数含义如下:. corePoolSize:指定了线程池中的线程数量。 maximumPoolSize:指定了线程池的最大线程数量。 keepAliveTime当线程池线程数量超过corePoolSize时 ,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内会被销毁。 unit: keepAliveTime的单位。 workQueue:任务队列,被提交但尚未被执行的任务 threadFactory:线程工厂,用于创建线程,一般用默认的即可 handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。以上参数中,大部分都很简单 ,只有workQueue和handler需要进行详细说明
任务队列:
参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,:仅用于在放Runnable对象。根据队列功能分类,在Thread
PoolExecutor的构造函数中可使,用以下几种BlockingQueue:"
直接提交的对列:该功能由SynchronousQueue对象提供。SynchronousQueue是个特殊的BlockingQueue.,SynchronousQueue没有容
量,每一个插入操作都要等待.一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。
SynchronousQueue不保存任务,它总是将任务提交给线程执行,如果没有空闲的,进程,则尝试创建新的进程,如果进程数量已经达到最天值,则执行拒绝策略。
因此,使用SynchronousQueue队列,通常要设置很大的-maximumPoolSize值,否则很容易执行异常策略
有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现:
ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量:public ArrayBlockingQueue (int capacity)
,当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于.corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待对列。若等待队
列已满,无法加入,则在总线程数不大于maximumPoolSize的前
提下,创建新的进程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当任务队列装满时,才可能将
线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize.
无界的任务队列:
无界任务队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列
不存在任务入队失败的情况。当有新的住务到来,系统的线程数
小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有:的
任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。若任务创建…和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
优先任务队列:
优先任务队列是带有执行优先级的队列,它通过PriorityBlockingQueue实现。可以控制任务的执行先后顺序,是一个特殊的无
界队列。无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出算法处理任
的。而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证
(总是确保高优先级的任务先执行)。拒绝策略:
JDK内置的拒维策略如下:
AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的幽一个任务,并尝试再次提交当前任务。
DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。
以上内置的策略均实现了RejectedExecutionHandler接口,若以 策略仍无法满足实际,应用需要,完全可以自己扩展RejectedExecutionHandler RejectedExecutionHandler的定义如下构造方法:
示例:
自定义ThreadFactory:
public class Task implements Runnable{ private int i; public Task(int i){this.i = i;} @Override public void run() { try { Thread.sleep(10000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("ThreadName:"+Thread.currentThread().getName()+"线程执行:"+i); } }
简单使用:
private static void simple(){ ThreadFactory namedThreadFactory = new MyThreadFactory(); int queueCapacity = 3, corePoolSize=2, maximumPoolSize=3; ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,10, TimeUnit.SECONDS,arrayBlockingQueue,namedThreadFactory); for(int i=1;i<5;i++){ Thread thread = namedThreadFactory.newThread(new Task(i)); threadPoolExecutor.execute(thread); System.out.println("i:"+i+", queueSize:"+arrayBlockingQueue.size() +", poolSize:"+threadPoolExecutor.getPoolSize() +", coreSize:"+threadPoolExecutor.getCorePoolSize() +", maxSize:"+threadPoolExecutor.getMaximumPoolSize()); } threadPoolExecutor.shutdown(); while (true){ if(threadPoolExecutor.isTerminated()){ System.out.println("over"); break; } } System.out.println( ); }
使用LinkedBlockingQueue作为示例队列:
private static void LinkedBlockingQueue(){ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); int queueCapacity = 3, corePoolSize=2, maximumPoolSize=3; LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(queueCapacity); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,10, TimeUnit.SECONDS,linkedBlockingQueue,namedThreadFactory); for(int i=1;i<21;i++){ Thread thread = namedThreadFactory.newThread(new Task(i)); System.out.println("即将添加数据:"+i); threadPoolExecutor.execute(thread); System.out.println("i:"+i+", queueSize:"+linkedBlockingQueue.size() +", poolSize:"+threadPoolExecutor.getPoolSize() +", coreSize:"+threadPoolExecutor.getCorePoolSize() +", maxSize:"+threadPoolExecutor.getMaximumPoolSize()); }
查看打印的日志:
即将添加数据:1 i:1, queueSize:0, poolSize:1, coreSize:2, maxSize:3 即将添加数据:2 i:2, queueSize:0, poolSize:2, coreSize:2, maxSize:3 即将添加数据:3 i:3, queueSize:1, poolSize:2, coreSize:2, maxSize:3 即将添加数据:4 i:4, queueSize:2, poolSize:2, coreSize:2, maxSize:3 即将添加数据:5 i:5, queueSize:3, poolSize:2, coreSize:2, maxSize:3 即将添加数据:6 i:6, queueSize:3, poolSize:3, coreSize:2, maxSize:3 即将添加数据:7 Exception in thread "main" java.util.concurrent.RejectedExecutionException
当添加第一个任务的时候,由于线程池空着,直接创建核心线程来处理请求;
当已经添加完两个请求,添加第三个请求的时候,核心线程数已满,则往队列里面添加,此时queueSize=1;
添加3、4、5任务,都被添加到了队列里面,此时queueSize=3;
添加6任务的时候,核心线程已满,队列已满,运行的线程数小于maximumPoolSize,那么线程池再处理一个任务,此时poolSize=3,线程池的任务满了;
添加7任务的时候,由于线程池里面的任务还没有执行完,而队列也是满的,线程池处理不了这么多任务了,抛出异常java.util.concurrent.RejectedExecutionException。
综上:先创建核心线程,够数后往队列里面塞,塞满继续创建执行线程,再满后抛出拒绝执行的异常。我们在执行的时候希望往池子里面一直扔,盛不下了也别抛异常,怎么办?
可以通过判断池子的大小,如果已经满了,则阻塞添加:
private static void LinkedBlockingQueue(){ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); int queueCapacity = 3, corePoolSize=2, maximumPoolSize=3; LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(queueCapacity); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,10, TimeUnit.SECONDS,linkedBlockingQueue,namedThreadFactory); for(int i=1;i<21;i++){ Thread thread = namedThreadFactory.newThread(new Task(i)); System.out.println("即将添加数据:"+i); threadPoolExecutor.execute(thread); System.out.println("i:"+i+", queueSize:"+linkedBlockingQueue.size() +", poolSize:"+threadPoolExecutor.getPoolSize() +", coreSize:"+threadPoolExecutor.getCorePoolSize() +", maxSize:"+threadPoolExecutor.getMaximumPoolSize()); while((threadPoolExecutor.getPoolSize()+linkedBlockingQueue.size())==(queueCapacity + maximumPoolSize)){ System.out.println("线程池已满,休眠等待 i:"+i+", queueSize:"+linkedBlockingQueue.size() +", poolSize:"+threadPoolExecutor.getPoolSize() +", coreSize:"+threadPoolExecutor.getCorePoolSize() +", maxSize:"+threadPoolExecutor.getMaximumPoolSize()); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } } threadPoolExecutor.shutdown(); while (true){ if(threadPoolExecutor.isTerminated()){ System.out.println("run over"); break; } } }
在for循环中进行了判断:
(threadPoolExecutor.getPoolSize()+linkedBlockingQueue.size())==(queueCapacity + maximumPoolSize),表示当前队列里面的线程数加上线程池里面当前线程数等于当前线程池可处理的最大线程的时候,进行Thread.sleep(1000L)等待,知道线程池有空闲资源的时候继续执行添加操作。
避免异常,可以使用阻塞队列ArrayBlockingQueue:
private static void ArrayBlockingQueue(){ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); int queueCapacity = 3, corePoolSize=2, maximumPoolSize=3; ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,10, TimeUnit.SECONDS,arrayBlockingQueue,namedThreadFactory); for(int i=1;i<21;i++){ int finalI = i; if((threadPoolExecutor.getPoolSize()+arrayBlockingQueue.size())>=(queueCapacity+maximumPoolSize)){ try { Thread thread = namedThreadFactory.newThread(new Task(finalI)); arrayBlockingQueue.put(thread); System.out.println("队列中添加线程 i:"+i+", queueSize:"+arrayBlockingQueue.size() +", poolSize:"+threadPoolExecutor.getPoolSize() +", coreSize:"+threadPoolExecutor.getCorePoolSize() +", maxSize:"+threadPoolExecutor.getMaximumPoolSize()); } catch (InterruptedException e) { e.printStackTrace(); } }else { Thread thread = namedThreadFactory.newThread(new Task(finalI)); threadPoolExecutor.execute(thread); System.out.println("i:"+i+", queueSize:"+arrayBlockingQueue.size() +", poolSize:"+threadPoolExecutor.getPoolSize() +", coreSize:"+threadPoolExecutor.getCorePoolSize() +", maxSize:"+threadPoolExecutor.getMaximumPoolSize()); } } threadPoolExecutor.shutdown(); while (true){ if(threadPoolExecutor.isTerminated()){ System.out.println("over"); break; } } System.out.println( ); }
使用ArrayBlockingQueue 阻塞机制,来实现同Thread.sleep()同样的效果。
更多相关内容 -
Python线程池模块ThreadPoolExecutor用法分析
2020-12-24 13:48:54本文实例讲述了Python线程池模块ThreadPoolExecutor用法。分享给大家供大家参考,具体如下: python3内置的有Threadingpool和ThreadPoolExecutor模块,两个都可以做线程池,当然ThreadPoolExecutor会更好用一些,... -
线程池ThreadPoolExecutor
2020-12-21 22:29:48引子 线程的创建和销毁比较消耗资源,所以有一种更加高效快捷的方式管理线程—-线程池。 先来看一下线程池的java模型 Executor:线程池顶级接口,只有一个方法 ExecutorService:真正的线程池接口 ... -
Java ThreadPoolExecutor 线程池的使用介绍
2020-08-26 04:01:26提供工厂方法来创建不同类型的线程池,这篇文章主要介绍了Java ThreadPoolExecutor 线程池的使用介绍,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来... -
java ThreadPoolExecutor使用方法简单介绍
2020-08-31 09:33:55主要介绍了java ThreadPoolExecutor使用方法简单介绍的相关资料,需要的朋友可以参考下 -
Java线程池ThreadPoolExecutor原理及使用实例
2020-08-19 06:01:14主要介绍了Java线程池ThreadPoolExecutor原理及使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 -
java ThreadPoolExecutor 并发调用实例详解
2020-08-30 09:56:20主要介绍了java ThreadPoolExecutor 并发调用实例详解的相关资料,需要的朋友可以参考下 -
解决python ThreadPoolExecutor 线程池中的异常捕获问题
2020-12-20 13:57:23这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeout=None) 方法... -
Spring线程池ThreadPoolExecutor配置并且得到任务执行的结果
2020-08-26 06:00:41今天小编就为大家分享一篇关于Spring线程池ThreadPoolExecutor配置并且得到任务执行的结果,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧 -
java中Executor,ExecutorService,ThreadPoolExecutor详解
2020-08-31 07:37:18主要介绍了java中Executor,ExecutorService,ThreadPoolExecutor详解的相关资料,需要的朋友可以参考下 -
简单谈谈ThreadPoolExecutor线程池之submit方法
2020-08-30 05:27:08下面小编就为大家带来一篇简单谈谈ThreadPoolExecutor线程池之submit方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧 -
Android之线程池ThreadPoolExecutor的简介
2021-01-04 07:54:28Android中的线程池ThreadPoolExecutor解决了单线程下载数据的效率慢和线程阻塞的的问题,它的应用也是优化实现的方式。所以它的重要性不言而喻,但是它的复杂性也大,理解上可能会有问题,不过作为安卓工程师,了解... -
ThreadPoolExecutor线程池原理及其execute方法(详解)
2020-08-30 05:25:11下面小编就为大家带来一篇ThreadPoolExecutor线程池原理及其execute方法(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧 -
线程池实例:使用Executors和ThreadPoolExecutor
2019-08-12 01:53:14NULL 博文链接:https://bijian1013.iteye.com/blog/2284676 -
python线程池 ThreadPoolExecutor 的用法示例
2020-12-16 20:41:32从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。 相比 threading 等模块,该模块通过 submit 返回的是一个 future ... -
Java进阶之ThreadPoolExecutor
2020-12-22 17:36:07· 使用自定义ThreadPoolExecutor · 使用Executors.newCachedThreadPool() · 使用Executors.newFixedThreadPool(int) · 使用Executors.newSingleThreadExecutor() 其中使用2,3,4来创建线程池时... -
ThreadPoolExecutor源码解析.pdf
2021-11-26 23:36:05ThreadPoolExecutor源码解析.pdf -
ThreadPoolExecutor线程池的使用方法
2020-08-25 14:21:55主要为大家详细介绍了ThreadPoolExecutor线程池的使用方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 -
java中ThreadPoolExecutor常识汇总
2020-08-25 21:50:44主要介绍了java中ThreadPoolExecutor常识汇总,线程池技术在并发时经常会使用到,java中的线程池的使用是通过调用ThreadPoolExecutor来实现的,需要的朋友可以参考下 -
线程池ThreadPoolExecutor使用简介与方法实例
2020-08-26 06:22:47今天小编就为大家分享一篇关于线程池ThreadPoolExecutor使用简介与方法实例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧 -
Java ThreadPoolExecutor的参数深入理解
2020-08-31 01:14:13主要介绍了Java ThreadPoolExecutor的参数深入理解的相关资料,需要的朋友可以参考下 -
死磕ThreadPoolExecutor线程池.pdf
2020-07-27 17:54:27死磕ThreadPoolExecutor线程池.pdf!!死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf -
ThreadPoolExecutor源码解析.md
2021-02-21 21:17:00ThreadPoolExecutor源码解析.md -
高并发之——通过源码深度解析ThreadPoolExecutor类是如何保证线程池正确运行的
2021-01-20 03:33:05对于线程池的核心类ThreadPoolExecutor来说,有哪些重要的属性和内部类为线程池的正确运行提供重要的保障呢? ThreadPoolExecutor类中的重要属性 在ThreadPoolExecutor类中,存在几个非常重要的属性和方法,接下来,... -
framework-analysis:对使用...Spring、MyBatis、AQS、ThreadPoolExecutor、CountDownLatch核心源码分析
2021-05-14 04:45:26于是乎到现在的Hibernate、MyBatis、Spring、Spring MVC、AQS、ThreadPoolExecutor、CountDownLatch使用场景和核心源码分析。 感觉自己还是真的菜鸡,有太多框架的底层实现都不怎么了解。 当山头被一座一座攻克时,... -
线程池原理-ThreadPoolExecutor源码解析
2020-11-17 10:33:16线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor