线程池 订阅
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。 展开全文
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
信息
适用范围
互联网
类    别
多线程处理形式
中文名
线程池
外文名
thread pool
线程池简介
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。线程池的伸缩性对性能有较大的影响。
收起全文
精华内容
下载资源
问答
  • 线程池

    2021-04-02 09:56:55
    线程池
  • 并发编程经历:线程池的使用

    万次阅读 多人点赞 2019-07-31 18:38:24
    线程池的使用 使用线程池管理线程可以最大程度的利用线程,节省资源消耗,它通过利用已有的线程多次循环执行多个任务从而提高系统的处理能力。 我们可以通过java.util.concurrent.ThreadPoolExecutor类来创建线程池...

    线程池的使用

    使用线程池管理线程可以最大程度的利用线程,节省资源消耗,它通过利用已有的线程多次循环执行多个任务从而提高系统的处理能力。

    我们可以通过java.util.concurrent.ThreadPoolExecutor类来创建线程池,一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。

    下面介绍一下里面的一些参数。

    1、创建一个线程池需要输入几个参数:

    ·        corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

    ·        runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。

    o   ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    o   LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    o   SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    o   PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

    ·        maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

    ·        ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

    ·        RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。

    o   AbortPolicy:直接抛出异常。

    o   CallerRunsPolicy:只用调用者所在线程来运行任务。

    o   DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

    o   DiscardPolicy:不处理,丢弃掉。

    o   当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

    ·        keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

    ·        TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),秒(SECONDS),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

    2、当一个任务通过execute(Runnable)方法欲添加到线程池时,会有如下几种情况:

    ·         如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    ·        如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

    ·        如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

    ·        如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

    ·        当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    3、举一个简单的例子:

    创建 TestThreadPool 类:

    1.  importjava.util.concurrent.ArrayBlockingQueue; 

    2.  importjava.util.concurrent.ThreadPoolExecutor; 

    3.  import java.util.concurrent.TimeUnit; 

    4.   

    5.  public class TestThreadPool { 

    6.    

    7.      private static int produceTaskSleepTime =2; 

    8.        

    9.      private static int produceTaskMaxNumber = 2; 

    10.   

    11.     public static void main(String[] args){ 

    12.   

    13.         // 构造一个线程池 

    14.         ThreadPoolExecutor threadPool = newThreadPoolExecutor(2, 4, 3, 

    15.                 TimeUnit.SECONDS, newArrayBlockingQueue<Runnable>(3), 

    16.                 newThreadPoolExecutor.DiscardOldestPolicy()); 

    17.   

    18.         for (int i = 1; i <=produceTaskMaxNumber; i++) { 

    19.             try { 

    20.                 String task = "task@" + i; 

    21.                 System.out.println("创建任务并提交到线程池中:" +task); 

    22.                 threadPool.execute(newThreadPoolTask(task)); 

    23.   

    24.                 Thread.sleep(produceTaskSleepTime); 

    25.             } catch (Exception e) { 

    26.                 e.printStackTrace(); 

    27.             } 

    28.         } 

    29.     } 

    30.  }  

    创建 ThreadPoolTask类:

    31. public class ThreadPoolTask implementsRunnable, Serializable { 

    32.   

    33.     private Object attachData; 

    34.   

    35.     ThreadPoolTask(Object tasks) { 

    36.         this.attachData = tasks; 

    37.     } 

    38.   

    39.     public void run() { 

    40.           

    41.         System.out.println("开始执行任务:" +attachData); 

    42.           

    43.         attachData = null; 

    44.     } 

    45.   

    46.     public Object getTask() { 

    47.         return this.attachData; 

    48.     } 

    49.  }  

    执行结果:

    创建任务并提交到线程池中:task@ 1

    开始执行任务:task@ 1

    创建任务并提交到线程池中:task@ 2

    开始执行任务:task@ 2

     4、再讲一个实际的例子

    这里用的是spring的线程池:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor。

    直接上代码:

    50.     //是否中断导入数据程序

    51.     publicstatic AtomicBoolean  terminateImport = newAtomicBoolean(false);

    52.     //成功条数

    53.     public static AtomicLong success_line = newAtomicLong();

    54.     //失败条数

    55.     publicstatic AtomicLong error_line = new AtomicLong();

    56.     //是否发现有线程rejected,如果有,说明线程队列满了,外部执行代码暂时中断队列填充,等待5s后,重新填充.

    57.     publicstatic AtomicBoolean found_rejected = new AtomicBoolean(false);

    58.     //开始执行

    59.     publicvoid startSwitch(int itemsPerPage, String updateMode,String applySeqno, Date startDate,Date endDate) {

    60.         int nowPage = 1;

    61.         int exec_sum = 0 ;

    62.         boolean hasNext = true ;

    63.         //重置静态值

    64.         success_line = new AtomicLong(0L);

    65.         error_line = new AtomicLong(0L);

    66.         terminateImport = new AtomicBoolean(false);

    67.  

    68.         ThreadPoolTaskExecutor threadPoolTaskExecutor= new ThreadPoolTaskExecutor();

    69.         threadPoolTaskExecutor.setCorePoolSize(100);

    70.         threadPoolTaskExecutor.setMaxPoolSize(200);

    71.         threadPoolTaskExecutor.setQueueCapacity(100);

    72.         threadPoolTaskExecutor.setRejectedExecutionHandler(newRejectedExecutionHandler() {

    73.             @Override

    74.             public voidrejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

    75.                 logger.warn("findRejectedExecutionException error !!,will set found_rejected boolean is false");

    76.                 found_rejected.set(true);

    77.                 threadPoolExecutor.execute(runnable);

    78.             }

    79.         });

    80.         //设成true后当调用shutdown()关闭线程池时会去检查任务是否都执行完毕

    81.         threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);

    82.         threadPoolTaskExecutor.initialize();

    83.  

    84.         while (hasNext) {

    85.             …

    86.             // applyDTOs是调接口进行分页查询得到的List集合,就是上面”…”省略的部分

    87.             //如果当次取出的总数比分页数还要小,那么下次进来直接中断掉这个循环

    88.             if(itemsPerPage>applyDTOs.size()){

    89.                 hasNext = false ;

    90.             }

    91.             if(ListUtil.isNotBlank(applyDTOs)){

    92.                 if(found_rejected.get()==false){

    93.                     if(ApplyInfoSync.terminateImport.get()==false){

    94.                         ApplySyncDataThread applySyncDataThread= new ApplySyncDataThread(

    95.                                 convert2LoanApplyInfoDOList(applyDTOs),updateMode, loanApplyComponent);

    96.                         threadPoolTaskExecutor.execute(applySyncDataThread);

    97.                     }else{

    98.                         hasNext= false;

    99.                         LogUtil.warn(logger, "主线程发现中断,开始尝试关闭线程池新进入数据");

    100.                      }

    101.                  }else{

    102.                      LogUtil.warn(logger,"执行被拒绝,队列已满了,需要等待5秒");

    103.                      try {

    104.                          TimeUnit.MILLISECONDS.sleep(5000L);

    105.                          found_rejected.set(false);

    106.                      } catch(InterruptedException e) {

    107.                          LogUtil.warn(e,logger,"foundrejected thread,Thread Sleep is error !!");

    108.                      }

    109.                  }

    110.              }

    111.              exec_sum= exec_sum+applyDTOs.size();

    112.              if(hasNext)

    113.                  nowPage++;

    114.          }

    115.   

    116.          while (threadPoolTaskExecutor.getActiveCount()>0){

    117.              try {

    118.                  logger.warn("threadPool ishave active count ,now sleep...");

    119.                  TimeUnit.MILLISECONDS.sleep(60000L);

    120.              } catch (InterruptedException e) {

    121.                  logger.warn("Thread Sleepis  error !!", e);

    122.              }

    123.          }

    124.          threadPoolTaskExecutor.shutdown();

    125.   

    126.          //检查线程池是否完全关闭,否则先等一等再走到日志打印阶段

    127.          while (threadPoolTaskExecutor.getThreadPoolExecutor().isTerminated()== false) {

    128.              try {

    129.                  TimeUnit.MILLISECONDS.sleep(6000L);

    130.              } catch (InterruptedException e) {

    131.                  LogUtil.error(e,logger,"ThreadSleep is  error !!");

    132.              }

    133.          }

    134.        …//一些打印日志的操作

    135.      }

    这里其他的不多说了,就说几点使容易看得懂。我这里有两个属性:terminateImport和found_rejected,他们都是AtomicBoolean类型(传入的boolean变量会被转换为volatile的int值,true为1,false为0)的都被static修饰,所以都只有一份且是同步安全的。terminateImport在一开始调用startSwitch时被重置为false,表明没有被中断,当在执行的过程中,有其他的入口调用terminateImport.set(true)后,程序再执行到if(ApplyInfoSync.terminateImport.get()==false)就会是false的,从而达到中断的作用;found_rejected用来标记任务是否被拒绝,经过之前的对线程池的介绍,应该知道了当达到最大线程数,任务队列也满了的时候,再execute的任务会被拒绝,所以我在threadPoolTaskExecutor.setRejectedExecutionHandler(…)设置饱和策略时自己定义实现了一个RejectedExecutionHandler接口来自定义策略,在里面把found_rejected标记改为了true,这样当程序再次执行到if(found_rejected.get()==false)就会是false的,从而可以监视线程和队列是否已经满了,当满了的时候,我就等待5秒,然后把found_rejected标记改为了false就可以继续填充队列了,至于里面为什么还要执行一次threadPoolExecutor.execute(runnable);是因为如果不执行被拒绝的任务,那任务就被抛弃了,当然你也可以做其他操作,例如持久化任务(就是存入数据库)等。

    展开全文
  • 给女朋友讲 : Java线程池的内部原理

    万次阅读 多人点赞 2019-11-03 15:09:57
    餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我说:“经常听你说线程池,到底线程池到底是个什么原理?”

    文章持续更新,微信搜索「 万猫学社 」第一时间阅读。
    关注后回复「 电子书 」,免费获取12本Java必读技术书籍。

    餐厅的约会

    餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我说:“经常听你说线程池,到底线程池到底是个什么原理?”我楞了一下,心里想女朋友今天是怎么了,怎么突然问出这么专业的问题,但做为一个专业人士在女朋友面前也不能露怯啊,想了一下便说:“我先给你讲讲我前同事老王的故事吧!”

    大龄程序员老王

    老王是一个已经北漂十多年的程序员,岁数大了,加班加不过年轻人,升迁也无望,于是拿着手里的一些积蓄,回老家转行创业。他选择了洗浴行业,开一家洗浴中心,没错,一家正规的洗浴中心。之前在北京的时候,喜欢去的澡堂叫“清华池”,他想了想,就给自己的洗浴中心取名为“线程池”。

    线程池洗浴中心

    线程池开业以后,老王发现有顾客想做足疗,于是就招聘了1个足疗技师,多增加了一项业务增加了收入。随着做足疗的顾客增多,为了赚更多钱又招聘了4个足疗技师。
    过了一段时间,洗浴中心的生意越来越好,做足疗的顾客也越来越多。但是,老王发现自己店里的足疗技师已经有5个足疗技师,再招聘就太多了,支付不起再多工资了。足疗技师忙不过来怎么办?老王是个聪明人,马上想到办法:让顾客排队,有哪个足疗技师做完了,空闲出来了,就在队伍里再叫一个顾客继续做。

    忙碌的周末

    一到周末,来洗浴中心的顾客比平时多了几倍,想足疗的顾客排队时间太长,顾客们已经不耐烦了。老王马上做出反应,又紧急从其他洗浴中心招聘了5个足疗技师,为队伍里顾客做足疗,大大减少排队的顾客。
    不过,有时生意太火爆了,紧急招聘的技师也用上了,顾客排队时间也是很长,再来新的顾客,老王只能满脸赔笑地和顾客说:“您下次再来吧,下次给您找个好技师。”,把顾客拒之门外。
    过了周末以后,店里不能养闲人啊,老王就把紧急招聘的技师都辞退了。

    老王的经营之道

    老王的生意越做越红火,很快就要开分店、融资上市、走上人生巅峰。既然这么成功,就让我们来复盘一下他的经营之道吧:

    如果你了解了老王的经营之道,线程池就不难理解了,把顾客替换成任务,把足疗技师替换成线程线程池洗浴中心就是线程池了,线程池的内部原理就是这样的:

    梦醒

    铃铃铃,闹铃把我吵醒,原来是一场梦啊,我哪有什么女朋友?今天上午有一个面试,赶紧起床洗漱完毕,就出发了。在路上回想那个奇怪的梦,要不再复习一下线程池的内部原理吧!
    先看一下ThreadPoolExecutor类的execute方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取clt,clt记录着线程池状态和运行线程数。
        int c = ctl.get();
        //运行线程数小于核心线程数时,创建线程放入线程池中,并且运行当前任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            //创建线程失败,重新获取clt。
            c = ctl.get();
        }
        //线程池是运行状态并且运行线程大于核心线程数时,把任务放入队列中。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新检查线程池不是运行状态时,
            //把任务移除队列,并通过拒绝策略对该任务进行处理。
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //当前运行线程数为0时,创建线程加入线程池中。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //运行线程大于核心线程数时并且队列已满时,
        //创建线程放入线程池中,并且运行当前任务。
        else if (!addWorker(command, false))
            //运行线程大于最大线程数时,失败则拒绝该任务
            reject(command);
    }
    

    在execute方法中,多次调用的addWorker方法,再看一下这个方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取clt,clt记录着线程池状态和运行线程数。
            int c = ctl.get();
            //获取线程池的运行状态。
            int rs = runStateOf(c);
    
            //线程池处于关闭状态,或者当前任务为null
            //或者队列不为空,则直接返回失败。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                //获取线程池中的线程数
                int wc = workerCountOf(c);
                //线程数超过CAPACITY,则返回false;
                //这里的core是addWorker方法的第二个参数,
                //如果为true则根据核心线程数进行比较,
                //如果为false则根据最大线程数进行比较。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //尝试增加线程数,如果成功,则跳出第一个for循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果增加线程数失败,则重新获取ctl
                c = ctl.get();
                //如果当前的运行状态不等于rs,说明状态已被改变,
                //返回第一个for循环继续执行
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //根据当前任务来创建Worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //获得锁以后,重新检查线程池状态
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        //把刚刚创建的线程加入到线程池中
                        workers.add(w);
                        int s = workers.size();
                        //记录线程池中出现过的最大线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程,开始运行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    面试

    一个穿着格子衬衫的中年男子坐在我面前,对我说:“您好,我是今天的面试官。”我微笑地回应:“您好。”面试官面无表情地问我:“线程池一定用过吧,能说说线程池的内部原理嘛?”我差点笑出声来,自信满满地说……

    文章持续更新,微信搜索「 万猫学社 」第一时间阅读。
    关注后回复「 电子书 」,免费获取12本Java必读技术书籍。

    展开全文
  • 线程池

    2012-03-01 23:53:16
    VC实现线程池
  • Java 多线程:彻底搞懂线程池

    万次阅读 多人点赞 2019-07-09 19:27:00
    熟悉Java多线程编程的同学都知道,当我们线程创建过多时,容易引发内存溢出,因此我们就有必要使用线程池的技术了。 目录 1 线程池的优势 2 线程池的使用 3 线程池的工作原理 4 线程池的参数 4.1 任务队列...

    熟悉 Java 多线程编程的同学都知道,当我们线程创建过多时,容易引发内存溢出,因此我们就有必要使用线程池的技术了。

    目录

    1 线程池的优势

    2 线程池的使用

    3 线程池的工作原理

    4 线程池的参数

    4.1 任务队列(workQueue)

    4.2 线程工厂(threadFactory)

    4.3 拒绝策略(handler)

    5 功能线程池

    5.1 定长线程池(FixedThreadPool)

    5.2 定时线程池(ScheduledThreadPool )

    5.3 可缓存线程池(CachedThreadPool)

    5.4 单线程化线程池(SingleThreadExecutor)

    5.5 对比

    6 总结

    参考


    1 线程池的优势

    总体来说,线程池有如下的优势:

    (1)降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

    (2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

    (3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    2 线程池的使用

    线程池的真正实现类是 ThreadPoolExecutor,其构造方法有如下4种:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    可以看到,其需要如下几个参数:

    • corePoolSize(必需):核心线程数。默认情况下,核心线程会一直存活,但是当将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
    • maximumPoolSize(必需):线程池所能容纳的最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞。
    • keepAliveTime(必需):线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
    • unit(必需):指定 keepAliveTime 参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
    • workQueue(必需):任务队列。通过线程池的 execute() 方法提交的 Runnable 对象将存储在该参数中。其采用阻塞队列实现。
    • threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。
    • handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。

    线程池的使用流程如下:

    // 创建线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
                                                 MAXIMUM_POOL_SIZE,
                                                 KEEP_ALIVE,
                                                 TimeUnit.SECONDS,
                                                 sPoolWorkQueue,
                                                 sThreadFactory);
    // 向线程池提交任务
    threadPool.execute(new Runnable() {
        @Override
        public void run() {
            ... // 线程执行的任务
        }
    });
    // 关闭线程池
    threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
    threadPool.shutdownNow(); // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表

    3 线程池的工作原理

    下面来描述一下线程池工作的原理,同时对上面的参数有一个更深的了解。其工作原理流程图如下:

    通过上图,相信大家已经对所有参数有个了解了。下面再对任务队列、线程工厂和拒绝策略做更多的说明。

    4 线程池的参数

    4.1 任务队列(workQueue)

    任务队列是基于阻塞队列实现的,即采用生产者消费者模式,在 Java 中需要实现 BlockingQueue 接口。但 Java 已经为我们提供了 7 种阻塞队列的实现:

    1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
    2. LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为 Integer.MAX_VALUE
    3. PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现 Comparable 接口也可以提供 Comparator 来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。
    4. DelayQueue:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现 Delayed 接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
    5. SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用 put() 方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
    6. LinkedBlockingDeque: 使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样 FIFO(先进先出),也可以像栈一样 FILO(先进后出)。
    7. LinkedTransferQueue: 它是ConcurrentLinkedQueueLinkedBlockingQueue 和 SynchronousQueue 的结合体,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行为一致,但是是无界的阻塞队列。

    注意有界队列和无界队列的区别:如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;而如果使用无界队列,因为任务队列永远都可以添加任务,所以设置 maximumPoolSize 没有任何意义。

    4.2 线程工厂(threadFactory)

    线程工厂指定创建线程的方式,需要实现 ThreadFactory 接口,并实现 newThread(Runnable r) 方法。该参数可以不用指定,Executors 框架已经为我们实现了一个默认的线程工厂:

    /**
     * The default thread factory.
     */
    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    4.3 拒绝策略(handler)

    当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现 RejectedExecutionHandler 接口,并实现 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。不过 Executors 框架已经为我们实现了 4 种拒绝策略:

    1. AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
    2. CallerRunsPolicy:由调用线程处理该任务。
    3. DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
    4. DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。

    5 功能线程池

    嫌上面使用线程池的方法太麻烦?其实Executors已经为我们封装好了 4 种常见的功能线程池,如下:

    • 定长线程池(FixedThreadPool)
    • 定时线程池(ScheduledThreadPool )
    • 可缓存线程池(CachedThreadPool)
    • 单线程化线程池(SingleThreadExecutor)

    5.1 定长线程池(FixedThreadPool)

    创建方法的源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    • 特点:只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的有界队列。
    • 应用场景:控制线程最大并发数。

    使用示例:

    // 1. 创建定长线程池对象 & 设置线程池线程数量固定为3
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    // 2. 创建好Runnable类线程对象 & 需执行的任务
    Runnable task =new Runnable(){
      public void run() {
         System.out.println("执行任务啦");
      }
    };
    // 3. 向线程池提交任务
    fixedThreadPool.execute(task);

    5.2 定时线程池(ScheduledThreadPool )

    创建方法的源码:

    private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    • 特点:核心线程数量固定,非核心线程数量无限,执行完闲置 10ms 后回收,任务队列为延时阻塞队列。
    • 应用场景:执行定时或周期性的任务。

    使用示例:

    // 1. 创建 定时线程池对象 & 设置线程池线程数量固定为5
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    // 2. 创建好Runnable类线程对象 & 需执行的任务
    Runnable task =new Runnable(){
      public void run() {
         System.out.println("执行任务啦");
      }
    };
    // 3. 向线程池提交任务
    scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延迟1s后执行任务
    scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// 延迟10ms后、每隔1000ms执行任务

    5.3 可缓存线程池(CachedThreadPool)

    创建方法的源码:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
    • 特点:无核心线程,非核心线程数量无限,执行完闲置 60s 后回收,任务队列为不存储元素的阻塞队列。
    • 应用场景:执行大量、耗时少的任务。

    使用示例:

    // 1. 创建可缓存线程池对象
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    // 2. 创建好Runnable类线程对象 & 需执行的任务
    Runnable task =new Runnable(){
      public void run() {
         System.out.println("执行任务啦");
      }
    };
    // 3. 向线程池提交任务
    cachedThreadPool.execute(task);

    5.4 单线程化线程池(SingleThreadExecutor)

    创建方法的源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    • 特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
    • 应用场景:不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。

    使用示例:

    // 1. 创建单线程化线程池
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    // 2. 创建好Runnable类线程对象 & 需执行的任务
    Runnable task =new Runnable(){
      public void run() {
         System.out.println("执行任务啦");
      }
    };
    // 3. 向线程池提交任务
    singleThreadExecutor.execute(task);

    5.5 对比

    6 总结

    Executors 的 4 个功能线程池虽然方便,但现在已经不建议使用了,而是建议直接通过使用 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    其实 Executors 的 4 个功能线程有如下弊端:

    • FixedThreadPool 和 SingleThreadExecutor:主要问题是堆积的请求处理队列均采用 LinkedBlockingQueue,可能会耗费非常大的内存,甚至 OOM。
    • CachedThreadPool 和 ScheduledThreadPool:主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

     

    参考

     

    展开全文
  • Java线程池七个参数详解

    万次阅读 多人点赞 2019-04-23 11:14:33
    java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释。 从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、...

    java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释。

    从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释。

    一、corePoolSize 线程池核心线程大小

    线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。

    二、maximumPoolSize 线程池最大线程数量

    一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接将任务交给这个空闲线程来执行,如果没有则会缓存到工作队列(后面会介绍)中,如果工作队列满了,才会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。

    三、keepAliveTime 空闲线程存活时间

    一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定

    四、unit 空闲线程存活时间单位

    keepAliveTime的计量单位

    五、workQueue 工作队列

    新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:

    ①ArrayBlockingQueue

    基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

    ②LinkedBlockingQuene

    基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

    ③SynchronousQuene

    一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

    ④PriorityBlockingQueue

    具有优先级的无界阻塞队列,优先级通过参数Comparator实现。

    六、threadFactory 线程工厂

    创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等

    七、handler 拒绝策略

    当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:

    ①CallerRunsPolicy

    该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。

    ②AbortPolicy

    该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。

    ③DiscardPolicy

    该策略下,直接丢弃任务,什么都不做。

    ④DiscardOldestPolicy

    该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列

     

    到此,构造线程池时的七个参数,就全部介绍完毕了。

    个人独立博客:https://blog.it-follower.com/posts/1035400434.html

    展开全文
  • 线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池...
  • Java线程池详解

    万次阅读 多人点赞 2018-04-08 19:07:23
    为什么需要使用线程池线程池的创建及重要参数 springboot中使用线程池 1.为什么需要线程池? java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建...
  • 线程池
  • 深入Java线程池:从设计思想到源码解读

    万次阅读 多人点赞 2021-02-14 11:26:10
    本文由浅入深,阐述下面4大内容,深入解读线程池 1、线程池的优势 2、线程池的原理 3、线程池的使用 4、线程池的源码解读

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 394,351
精华内容 157,740
关键字:

线程池