线程池 订阅
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。 展开全文
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
信息
适用范围
互联网
类    别
多线程处理形式
中文名
线程池
外文名
thread pool
线程池简介
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。线程池的伸缩性对性能有较大的影响。
收起全文
精华内容
下载资源
问答
  • 线程池

    万次阅读 2019-06-23 21:13:32
    线程池 为什么要使用线程池? 线程是处理器调度的基本单位。我们会为每一个请求都独立创建一个线程,而操作系统创建线程、切换线程状态、结束线程都要使用CPU进行调度。使用线程池能够更好对线程进行管理、复用等。...

    线程池

    在这里插入图片描述

    为什么要使用线程池?
    线程是处理器调度的基本单位。我们会为每一个请求都独立创建一个线程,而操作系统创建线程、切换线程状态、结束线程都要使用CPU进行调度。使用线程池能够更好对线程进行管理、复用等。

    JDK为我们提供了那些支持?
    ScheduledThreadPoolExecutor
    ThreadPoolExecutor(最核心)
    ForkJoin Pool

    ThreadPoolExecutor的结构
    在这里插入图片描述
    核心属性:
    corePoolSize #核心线程数
    maxinmumPoolSize #线程总数 非核心数=总数-核心数
    keepAliveTime #当前线程数大于核心线程数时 非核心线程的等待被执行的等待时间
    TimeUnit unit #时间单位
    workQueue #保存未被执行的任务队列
    RejectedExecutionHandler #拒绝处理器

    poolExecutor 线程池对象
    submit(Callable x) execute(Runnable x) 将任务送入到任务队列
    Runnable接口没有返回值,Callable有返回值
    核心:
    当前运行线程数 小于corePoolSize 任务直接交给核心线程进行执行(通过线程调度执行任务)
    当前运行线程数 大于或等于 corePoolSize 任务且满足队列规则 任务将进入任务队列进行等待
    当前运行线程数 大于或等于 corePoolSize 任务且不满足队列规则 任务进入非核心线程(这类线程有存活时间,不一定会执行成功)

    任务队列(实现BlockingQueue接口)
    SyschronousQueue:每一次add()插入 必须要等待相对删除/读取操作
    ArrayBlockingQueue:数组的方式,大小创建后不能改变大小,具有阻塞特性。
    LinkedBlockingQueue:无限容量 基于链表的形式
    LinkedBlockingDeque :无限双端链表队列,可以从两头进行元素的读/取操作
    PriorityBlockingQueue:按照优先级进行内部元素排序的无限队列。
    LinkedTransferQueue:无限队列,先进先出,具有阻塞特性。
    #阻塞特性:当队列满了,便会阻塞等待,直到有元素出队,后续的元素才可以被加入队列。

    拒绝处理器
    适用:那些既不能进入核心线程、等待队列,也无法创建新的线程执行(即非核心线程),或者线程异常等。
    CallerRunsPolicy:直接运行该任务的run方法,但不是在线程池内部
    AbortPolicy:RejectedExecutionException异常抛出(默认)
    DiscardPolicy:不会做任何处理
    DiscardOldestPolicy:检查等待队列 强行取出队列头部任务 进行执行

    ForkJoin框架

    在这里插入图片描述
    ForkJoin采用“工作窃取模式”,当有新的任务它可以将其拆分成更小的任务去执行,之后将结果进行合并,得到最终的结果。这种思想,类似于map/reduce的分而治之的思想处理任务。

    ForJoin的简单实现

    ForkJoinPool forkJoinPool = new ForkJoinPool();//实现ForkJoin 就必须有ForkJoinPool的支持
    ForkJoinTask<Long> task = new ForkJoinWork(0L,10000000000L);//参数为起始值与结束值
    Long invoke = forkJoinPool.invoke(task);#执行ForkJoin任务中的compute()
    

    ForJoinTask必须继承RecursiveTask(有返回值)或者 RecursiveAction (没有返回值)

    public class ForkJoinWork extends RecursiveTask<Long> {
        private Long start;//起始值
        private Long end;//结束值
        public static final  Long critical = 100000L;//临界值
        public ForkJoinWork(Long start, Long end) {
            this.start = start;
            this.end = end;
        }
    
    @Override
    protected Long compute() {
        //判断是否是拆分完毕
        Long lenth = end - start;
        if(lenth<=critical){
            //如果拆分完毕就相加
            Long sum = 0L;
            for (Long i = start;i<=end;i++){
                sum += i;
            }
            return sum;
        }else {
            //没有拆分完毕就开始拆分
            Long middle = (end + start)/2;//计算的两个值的中间值
            ForkJoinWork right = new ForkJoinWork(start,middle);
            right.fork();//拆分,并压入线程队列
            ForkJoinWork left = new ForkJoinWork(middle+1,end);
            left.fork();//拆分,并压入线程队列
    
            //合并
            return right.join() + left.join();
        }
    }
    }
    

    在这里插入图片描述

    展开全文
  • 并发编程经历:线程池的使用

    万次阅读 多人点赞 2019-07-31 18:38:24
    线程池的使用 使用线程池管理线程可以最大程度的利用线程,节省资源消耗,它通过利用已有的线程多次循环执行多个任务从而提高系统的处理能力。 我们可以通过java.util.concurrent.ThreadPoolExecutor类来创建线程池...

    线程池的使用

    使用线程池管理线程可以最大程度的利用线程,节省资源消耗,它通过利用已有的线程多次循环执行多个任务从而提高系统的处理能力。

    我们可以通过java.util.concurrent.ThreadPoolExecutor类来创建线程池,一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。

    下面介绍一下里面的一些参数。

    1、创建一个线程池需要输入几个参数:

    ·        corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

    ·        runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。

    o   ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    o   LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    o   SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    o   PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

    ·        maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

    ·        ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

    ·        RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。

    o   AbortPolicy:直接抛出异常。

    o   CallerRunsPolicy:只用调用者所在线程来运行任务。

    o   DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

    o   DiscardPolicy:不处理,丢弃掉。

    o   当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

    ·        keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

    ·        TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),秒(SECONDS),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

    2、当一个任务通过execute(Runnable)方法欲添加到线程池时,会有如下几种情况:

    ·         如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    ·        如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

    ·        如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

    ·        如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

    ·        当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    3、举一个简单的例子:

    创建 TestThreadPool 类:

    1.  importjava.util.concurrent.ArrayBlockingQueue; 

    2.  importjava.util.concurrent.ThreadPoolExecutor; 

    3.  import java.util.concurrent.TimeUnit; 

    4.   

    5.  public class TestThreadPool { 

    6.    

    7.      private static int produceTaskSleepTime =2; 

    8.        

    9.      private static int produceTaskMaxNumber = 2; 

    10.   

    11.     public static void main(String[] args){ 

    12.   

    13.         // 构造一个线程池 

    14.         ThreadPoolExecutor threadPool = newThreadPoolExecutor(2, 4, 3, 

    15.                 TimeUnit.SECONDS, newArrayBlockingQueue<Runnable>(3), 

    16.                 newThreadPoolExecutor.DiscardOldestPolicy()); 

    17.   

    18.         for (int i = 1; i <=produceTaskMaxNumber; i++) { 

    19.             try { 

    20.                 String task = "task@" + i; 

    21.                 System.out.println("创建任务并提交到线程池中:" +task); 

    22.                 threadPool.execute(newThreadPoolTask(task)); 

    23.   

    24.                 Thread.sleep(produceTaskSleepTime); 

    25.             } catch (Exception e) { 

    26.                 e.printStackTrace(); 

    27.             } 

    28.         } 

    29.     } 

    30.  }  

    创建 ThreadPoolTask类:

    31. public class ThreadPoolTask implementsRunnable, Serializable { 

    32.   

    33.     private Object attachData; 

    34.   

    35.     ThreadPoolTask(Object tasks) { 

    36.         this.attachData = tasks; 

    37.     } 

    38.   

    39.     public void run() { 

    40.           

    41.         System.out.println("开始执行任务:" +attachData); 

    42.           

    43.         attachData = null; 

    44.     } 

    45.   

    46.     public Object getTask() { 

    47.         return this.attachData; 

    48.     } 

    49.  }  

    执行结果:

    创建任务并提交到线程池中:task@ 1

    开始执行任务:task@ 1

    创建任务并提交到线程池中:task@ 2

    开始执行任务:task@ 2

     4、再讲一个实际的例子

    这里用的是spring的线程池:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor。

    直接上代码:

    50.     //是否中断导入数据程序

    51.     publicstatic AtomicBoolean  terminateImport = newAtomicBoolean(false);

    52.     //成功条数

    53.     public static AtomicLong success_line = newAtomicLong();

    54.     //失败条数

    55.     publicstatic AtomicLong error_line = new AtomicLong();

    56.     //是否发现有线程rejected,如果有,说明线程队列满了,外部执行代码暂时中断队列填充,等待5s后,重新填充.

    57.     publicstatic AtomicBoolean found_rejected = new AtomicBoolean(false);

    58.     //开始执行

    59.     publicvoid startSwitch(int itemsPerPage, String updateMode,String applySeqno, Date startDate,Date endDate) {

    60.         int nowPage = 1;

    61.         int exec_sum = 0 ;

    62.         boolean hasNext = true ;

    63.         //重置静态值

    64.         success_line = new AtomicLong(0L);

    65.         error_line = new AtomicLong(0L);

    66.         terminateImport = new AtomicBoolean(false);

    67.  

    68.         ThreadPoolTaskExecutor threadPoolTaskExecutor= new ThreadPoolTaskExecutor();

    69.         threadPoolTaskExecutor.setCorePoolSize(100);

    70.         threadPoolTaskExecutor.setMaxPoolSize(200);

    71.         threadPoolTaskExecutor.setQueueCapacity(100);

    72.         threadPoolTaskExecutor.setRejectedExecutionHandler(newRejectedExecutionHandler() {

    73.             @Override

    74.             public voidrejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

    75.                 logger.warn("findRejectedExecutionException error !!,will set found_rejected boolean is false");

    76.                 found_rejected.set(true);

    77.                 threadPoolExecutor.execute(runnable);

    78.             }

    79.         });

    80.         //设成true后当调用shutdown()关闭线程池时会去检查任务是否都执行完毕

    81.         threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);

    82.         threadPoolTaskExecutor.initialize();

    83.  

    84.         while (hasNext) {

    85.             …

    86.             // applyDTOs是调接口进行分页查询得到的List集合,就是上面”…”省略的部分

    87.             //如果当次取出的总数比分页数还要小,那么下次进来直接中断掉这个循环

    88.             if(itemsPerPage>applyDTOs.size()){

    89.                 hasNext = false ;

    90.             }

    91.             if(ListUtil.isNotBlank(applyDTOs)){

    92.                 if(found_rejected.get()==false){

    93.                     if(ApplyInfoSync.terminateImport.get()==false){

    94.                         ApplySyncDataThread applySyncDataThread= new ApplySyncDataThread(

    95.                                 convert2LoanApplyInfoDOList(applyDTOs),updateMode, loanApplyComponent);

    96.                         threadPoolTaskExecutor.execute(applySyncDataThread);

    97.                     }else{

    98.                         hasNext= false;

    99.                         LogUtil.warn(logger, "主线程发现中断,开始尝试关闭线程池新进入数据");

    100.                      }

    101.                  }else{

    102.                      LogUtil.warn(logger,"执行被拒绝,队列已满了,需要等待5秒");

    103.                      try {

    104.                          TimeUnit.MILLISECONDS.sleep(5000L);

    105.                          found_rejected.set(false);

    106.                      } catch(InterruptedException e) {

    107.                          LogUtil.warn(e,logger,"foundrejected thread,Thread Sleep is error !!");

    108.                      }

    109.                  }

    110.              }

    111.              exec_sum= exec_sum+applyDTOs.size();

    112.              if(hasNext)

    113.                  nowPage++;

    114.          }

    115.   

    116.          while (threadPoolTaskExecutor.getActiveCount()>0){

    117.              try {

    118.                  logger.warn("threadPool ishave active count ,now sleep...");

    119.                  TimeUnit.MILLISECONDS.sleep(60000L);

    120.              } catch (InterruptedException e) {

    121.                  logger.warn("Thread Sleepis  error !!", e);

    122.              }

    123.          }

    124.          threadPoolTaskExecutor.shutdown();

    125.   

    126.          //检查线程池是否完全关闭,否则先等一等再走到日志打印阶段

    127.          while (threadPoolTaskExecutor.getThreadPoolExecutor().isTerminated()== false) {

    128.              try {

    129.                  TimeUnit.MILLISECONDS.sleep(6000L);

    130.              } catch (InterruptedException e) {

    131.                  LogUtil.error(e,logger,"ThreadSleep is  error !!");

    132.              }

    133.          }

    134.        …//一些打印日志的操作

    135.      }

    这里其他的不多说了,就说几点使容易看得懂。我这里有两个属性:terminateImport和found_rejected,他们都是AtomicBoolean类型(传入的boolean变量会被转换为volatile的int值,true为1,false为0)的都被static修饰,所以都只有一份且是同步安全的。terminateImport在一开始调用startSwitch时被重置为false,表明没有被中断,当在执行的过程中,有其他的入口调用terminateImport.set(true)后,程序再执行到if(ApplyInfoSync.terminateImport.get()==false)就会是false的,从而达到中断的作用;found_rejected用来标记任务是否被拒绝,经过之前的对线程池的介绍,应该知道了当达到最大线程数,任务队列也满了的时候,再execute的任务会被拒绝,所以我在threadPoolTaskExecutor.setRejectedExecutionHandler(…)设置饱和策略时自己定义实现了一个RejectedExecutionHandler接口来自定义策略,在里面把found_rejected标记改为了true,这样当程序再次执行到if(found_rejected.get()==false)就会是false的,从而可以监视线程和队列是否已经满了,当满了的时候,我就等待5秒,然后把found_rejected标记改为了false就可以继续填充队列了,至于里面为什么还要执行一次threadPoolExecutor.execute(runnable);是因为如果不执行被拒绝的任务,那任务就被抛弃了,当然你也可以做其他操作,例如持久化任务(就是存入数据库)等。

    展开全文
  • 给女朋友讲 : Java线程池的内部原理

    万次阅读 多人点赞 2019-11-03 15:09:57
    餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我说:“经常听你说线程池,到底线程池到底是个什么原理?”

    文章持续更新,微信搜索「 万猫学社 」第一时间阅读。
    关注后回复「 电子书 」,免费获取12本Java必读技术书籍。

    餐厅的约会

    餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我说:“经常听你说线程池,到底线程池到底是个什么原理?”我楞了一下,心里想女朋友今天是怎么了,怎么突然问出这么专业的问题,但做为一个专业人士在女朋友面前也不能露怯啊,想了一下便说:“我先给你讲讲我前同事老王的故事吧!”

    大龄程序员老王

    老王是一个已经北漂十多年的程序员,岁数大了,加班加不过年轻人,升迁也无望,于是拿着手里的一些积蓄,回老家转行创业。他选择了洗浴行业,开一家洗浴中心,没错,一家正规的洗浴中心。之前在北京的时候,喜欢去的澡堂叫“清华池”,他想了想,就给自己的洗浴中心取名为“线程池”。

    线程池洗浴中心

    线程池开业以后,老王发现有顾客想做足疗,于是就招聘了1个足疗技师,多增加了一项业务增加了收入。随着做足疗的顾客增多,为了赚更多钱又招聘了4个足疗技师。
    过了一段时间,洗浴中心的生意越来越好,做足疗的顾客也越来越多。但是,老王发现自己店里的足疗技师已经有5个足疗技师,再招聘就太多了,支付不起再多工资了。足疗技师忙不过来怎么办?老王是个聪明人,马上想到办法:让顾客排队,有哪个足疗技师做完了,空闲出来了,就在队伍里再叫一个顾客继续做。

    忙碌的周末

    一到周末,来洗浴中心的顾客比平时多了几倍,想足疗的顾客排队时间太长,顾客们已经不耐烦了。老王马上做出反应,又紧急从其他洗浴中心招聘了5个足疗技师,为队伍里顾客做足疗,大大减少排队的顾客。
    不过,有时生意太火爆了,紧急招聘的技师也用上了,顾客排队时间也是很长,再来新的顾客,老王只能满脸赔笑地和顾客说:“您下次再来吧,下次给您找个好技师。”,把顾客拒之门外。
    过了周末以后,店里不能养闲人啊,老王就把紧急招聘的技师都辞退了。

    老王的经营之道

    老王的生意越做越红火,很快就要开分店、融资上市、走上人生巅峰。既然这么成功,就让我们来复盘一下他的经营之道吧:

    如果你了解了老王的经营之道,线程池就不难理解了,把顾客替换成任务,把足疗技师替换成线程线程池洗浴中心就是线程池了,线程池的内部原理就是这样的:

    梦醒

    铃铃铃,闹铃把我吵醒,原来是一场梦啊,我哪有什么女朋友?今天上午有一个面试,赶紧起床洗漱完毕,就出发了。在路上回想那个奇怪的梦,要不再复习一下线程池的内部原理吧!
    先看一下ThreadPoolExecutor类的execute方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取clt,clt记录着线程池状态和运行线程数。
        int c = ctl.get();
        //运行线程数小于核心线程数时,创建线程放入线程池中,并且运行当前任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            //创建线程失败,重新获取clt。
            c = ctl.get();
        }
        //线程池是运行状态并且运行线程大于核心线程数时,把任务放入队列中。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新检查线程池不是运行状态时,
            //把任务移除队列,并通过拒绝策略对该任务进行处理。
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //当前运行线程数为0时,创建线程加入线程池中。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //运行线程大于核心线程数时并且队列已满时,
        //创建线程放入线程池中,并且运行当前任务。
        else if (!addWorker(command, false))
            //运行线程大于最大线程数时,失败则拒绝该任务
            reject(command);
    }
    

    在execute方法中,多次调用的addWorker方法,再看一下这个方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取clt,clt记录着线程池状态和运行线程数。
            int c = ctl.get();
            //获取线程池的运行状态。
            int rs = runStateOf(c);
    
            //线程池处于关闭状态,或者当前任务为null
            //或者队列不为空,则直接返回失败。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                //获取线程池中的线程数
                int wc = workerCountOf(c);
                //线程数超过CAPACITY,则返回false;
                //这里的core是addWorker方法的第二个参数,
                //如果为true则根据核心线程数进行比较,
                //如果为false则根据最大线程数进行比较。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //尝试增加线程数,如果成功,则跳出第一个for循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果增加线程数失败,则重新获取ctl
                c = ctl.get();
                //如果当前的运行状态不等于rs,说明状态已被改变,
                //返回第一个for循环继续执行
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //根据当前任务来创建Worker对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //获得锁以后,重新检查线程池状态
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        //把刚刚创建的线程加入到线程池中
                        workers.add(w);
                        int s = workers.size();
                        //记录线程池中出现过的最大线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程,开始运行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    面试

    一个穿着格子衬衫的中年男子坐在我面前,对我说:“您好,我是今天的面试官。”我微笑地回应:“您好。”面试官面无表情地问我:“线程池一定用过吧,能说说线程池的内部原理嘛?”我差点笑出声来,自信满满地说……

    文章持续更新,微信搜索「 万猫学社 」第一时间阅读。
    关注后回复「 电子书 」,免费获取12本Java必读技术书籍。

    展开全文
  • JAVA线程池总结及自定义线程池

    万次阅读 2020-06-30 16:17:51
    在前几年小编写过一篇关于线程池的总结:《线程总结》,现在回过头来看,总结的 还是比较详细的,不过当时并没有在项目中有过真实刺激的 体验,最近项目中偶然遇到了一次任务丢失的问题,我追踪了一下 代码, 发现...

    引言

    在前几年小编写过一篇关于线程池的总结:《线程总结》,现在回过头来看,总结的 还是比较详细的,不过当时并没有在项目中有过真实刺激的 体验,最近项目中偶然遇到了一次任务丢失的问题,我追踪了一下 代码, 发现由于不正当采用java内置线程池导致的, 应该是当时配置线程池的参数没有仔细计算导致的,关于这个问题我们后面博文在介绍,今天我们在看我java 内置 线程池代码以后,我们先动手自己写一个线程池来实现任务的提交和执行。这样我们可以更好的理解线程池的执行流程。如果读者对于java内置的线程的核心参数和执行流程不是很了解,可以点击上面链接,阅读博文。

    一、在编写代码之前,我们先介绍几个核心参数的配置依据。

    1.1、核心线程数量corePoolSize

    核心线程数的设计需要根据任务的处理时间和每秒产生的任务数量来确定,例如执行一个任务需要0.1秒,系统百分之八十的时间没秒都会产生100个任务,那么我们想要在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数量为10,当时实际情况不可能这么平均,所以一般我们按照2080原则设计即可,即按照百分之80的情况设计核心线程数量,剩下的百分之20可以利用最大线程数量处理。

    1.2、任务队列长度(workQueue)

    任务队列长度一般设计为核心线程数/单个任务执行时间*2(任务最大等待时间/s)即可,例如上面场景中,核心线程数设计为10,单个任务执行时间为0.1,则队列长度可以设计为200

    1.3、最大线程数(maximumPoolSize)

    最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定,例如,上述环境中,如果系统每秒最大产生的任务数量是1000个,那么最大线程数=(最大任务数-任务队列长度)* 单个任务执行时间;既最大线程数=(1000-200)* 0.1 =80;当然最大线程数和服务器的硬件配置也有很大关系

    上面的 参数配置公式,都是参考作用,在实际环境中,需要根据实际服务器的配置自行调整

    二、定义线程池

    ==========================开始coding====================================

    2.1、模拟任务类

    package com.threadpoll;
    
    /**
     * @author zhenghao
     * @description:
     * @date 2020/6/3010:07
     */
    public class MyTask implements Runnable {
        private  int id;
    
        public MyTask(int id) {
            this.id = id;
        }
    
        @Override
        public void run() {
    
            String name = Thread.currentThread().getName();
            System.out.println("线程:" + name + "即将执行任务:" + id);
    
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程:" + name + "完成了任务:" + id);
        }
    
        @Override
        public String toString() {
            return "MyTask{" +
                    "id=" + id +
                    '}';
        }
    }
    

    2.2、自定义线程类

    package com.threadpoll;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author zhenghao
     * @description:
     * @date 2020/6/3010:13
     */
    public class MyWorker extends Thread {
    
        private String name;
    
        private List<Runnable> tasks = new ArrayList<>();
    
        public MyWorker(String name, List<Runnable> tasks) {
            super(name);
            this.tasks = tasks;
        }
    
        @Override
        public void run() {
            while (tasks.size() > 0) {
                Runnable r = tasks.remove(0);
                r.run();
            }
        }
    }
    

    2.3、自定义线程池类

    package com.threadpoll;
    
    import java.util.Collections;
    import java.util.LinkedList;
    import java.util.List;
    
    /**
     * @author zhenghao
     * @description: 自定义线程池
     * @date 2020/6/3015:06
     */
    public class MyThreadPool {
    
    
        /**
         * 任务集合 多个线程同时remove 
         */
        private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<>());
        /**
         * 当前先测试数量
         */
        private int num;
        /**
         * 核心线程数量
         */
        private int coreThreadSize;
        /**
         * 最大线程数量
         */
        private int maxThreadSize;
    
        /**
         * 队列长度
         */
        private int workSize;
    
        public MyThreadPool( int coreThreadSize, int maxThreadSize, int workSize) {
            this.coreThreadSize = coreThreadSize;
            this.maxThreadSize = maxThreadSize;
            this.workSize = workSize;
        }
    
        /**
         * @Description: 提交队列
         * @author: zhenghao
         * @date: 2020/6/30 15:11
         */
        public void submitTask(Runnable r) {
            if (tasks.size() >= workSize) {
                System.out.println("任务" + r + "丢掉了");
            } else {
                //加入队列
                tasks.add(r);
                //执行队列
                execTask(r);
            }
        }
    
        private void execTask(Runnable r) {
            //判断是否需要创建核心线程池
            if (num < coreThreadSize) {
                //创建核心线程池执行
                new MyWorker("核心线程池" + num, tasks).start();
                num++;
            } else if (num < maxThreadSize) {
                //创建非核心线程池执行
                new MyWorker("非核心线程池" + num, tasks).start();
                num++;
            } else {
                System.out.println("任务" + r + "被缓存了");
            }
        }
    }
    

    2.4、测试类

    package com.threadpoll;
    
    /**
     * @author zhenghao
     * @description:
     * @date 2020/6/3010:07
     */
    public class MyTask implements Runnable {
        private  int id;
    
        public MyTask(int id) {
            this.id = id;
        }
    
        @Override
        public void run() {
    
            String name = Thread.currentThread().getName();
            System.out.println("线程:" + name + "即将执行任务:" + id);
    
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程:" + name + "完成了任务:" + id);
        }
    
        @Override
        public String toString() {
            return "MyTask{" +
                    "id=" + id +
                    '}';
        }
    }
    

    大家通过调解任务数量,也就是测试类中的循环数量,可以看到不同的效果,大家可以测试一下, 然后通过这个小demo,我们可以更好的了解线程池的执行流程。

     

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 95,200
精华内容 38,080
关键字:

线程池