精华内容
下载资源
问答
  • 制造一个轮子线程池

    2019-12-12 10:24:20
    这次为了帮助自己深入理解线程池,决定手动写一个极简(陋)的线程池,顺便记录思考和造过程。 虽然不太可能和jdk自带的那么完美,但是该有的功能还是要有: - 新建线程池,有核心线程数和最大线程数,线程存活...

    很早之前就看过线程池源码(知道大概的运行原理),但是只是知道怎么用,并没有深究。这次为了帮助自己深入理解线程池,决定手动写一个极简(陋)的线程池,顺便记录思考和造轮过程。

    虽然不太可能和jdk自带的那么完美,但是该有的功能还是要有:

    - 新建线程池,有核心线程数和最大线程数,线程存活时间,队列

    - 在线程池加入线程,当前线程数不超过核心线程数就新建线程,超过核心放队列,队列满了再新建线程,达到最大线程

    - 全部线程运行完成后会保留核心线程数,支持线程存活时间

    - 立即关闭线程池

    - 优雅的关闭线程池

    1.新建一个轮子线程池类,就一个构造函数,把需要的参数都传进来

    2.用ThreadPoolExecutor的时候,新建了线程池就会往里面提交线程了,我们写的也一样,而且往线程池里面加线程的时候就会判断:当前运行线程数是否大于核心线程数,是否大于最大线程数等,这里需要一个当前运行线程数的变量。

    所以这里增加一个成员变量activeCount,初始值为0,运行一个线程就加1,线程运行结束就减1,这里面减的时候是在不同线程里面,所以为了线程安全用AtomicInteger类型

        /** 当前活动线程数 */
        private AtomicInteger activeCount = new AtomicInteger(0);

    提交线程的方法,看过ThreadPoolExecutor源码的应该知道,里面每个线程都是包装了一个Worker类,新增线程的时候就会新建一个worker,为什么要这么做呢?我第一次看的时候也不理解,想着如果直接把传进来的线程start一下怎么样,如果直接新建线程会立马就发现问题,怎么知道线程什么运行完,怎么把activeCount减1?所以这里不能直接start,必须新建一个线程(异步执行,废话),这个线程必须要运行参数的线程run方法(废话,不然参数还有啥用,我们的业务逻辑咋执行),线程运行完成后activeCount减1。

    所以这里有了worker类,worker本身也是一个线程。顺便把线程名字也解决了,新建一个threadNum从0开始,新建一个worker就 1

    提交线程

    3.现在已经有点样子了,但是问题还很多,主要的一个问题:比如我建一个core=2,max=5,queue=5的线程池,假设往里面放了8个线程,会出现只运行了3个线程,跑完也就结束了,剩下的5个会在队列里面没处理,而且也不会保留2个核心线程

    WheelThreadPool2的运行结果

    现在想想怎么把队列的线程拿出来运行,没看ThreadPoolExecutor源码前我第一次想着是不是在创建线程池的时候默认启动一个线程去队列里面获取并执行,然而一想立马否定了,因为线程池是多线程运行的,队列里面的线程需要max(参数最大线程数)个线程同时执行。所以在我们新建的worker里面要能不断的循环获取队列的线程去执行,如果队列为空了,则退出循环,让线程结束

    改造一下worker的run方法,在execute方法创建的worker线程执行完通过参数传进来的runnable之后,循环获取队列并执行队列线程的run方法

    这样还有点问题,如果try里面出现异常,比如runnable.run异常或者r.run异常,这个线程就退出了,不能保持max个线程并行执行

    所以如果异常了需要重新创建一个线程继续跑循环,改造后

    这样改造后如果队列空了会把所有线程都结束掉,所以现在要解决执行完队列后保留core个线程的问题,怎么保留线程其实是通过阻塞队列实现的,

    当队列为空时,通过queue.take()方法阻塞住当前线程,直到又有线程提交。如果当前活动线程超出core,结束当前线程

    这样改造后大概轮廓出来了,因为queue是阻塞队列,而且各个方法都加了线程锁,所以本身也是线程安全的,这部分代码貌似不需要加锁,跑个测试用例试下,貌似很合理

    WheelThreadPoolTest3结果

    先放着,再看看下个功能,要支持线程存活时间,这个存活时间的意思是:比如上面WheelThreadPoolTest3里面的线程池运行了10个线程,跑完之后剩余2个线程,3个消亡了(完成任务了)。后面再提交10个线程,又新建3个线程(从控制台的线程名字可以看出),如果我们设置一个存活时间,让第一批的10个跑完后的那3个线程不消亡,比如存活5秒,第二批的10个跑的时候就可以复用,不需要重新创建线程。因为线程是稀缺资源,能复用就复用,新建线程也影响效率

    目前的代码线程消亡的标记是因为queue.poll获取到了null,导致循环退出,线程完成。而阻塞队列的poll方法还有一个多态方法E poll(long timeout, TimeUnit unit),可以在一定时间内poll,在时间内获取到了就会返回,这个时间刚好用于是线程的存活时间(死亡倒计时??)。

    构造方法已经传了存活时间和单位,直接加上这两个参数

    再来测试下,存活时间设置为5秒,那样第二批只能提交5个线程,否则会导致线程池慢

    结果

    发生一个大问题,最后线程都没了,而且主线程也退出了

    原因:第一批的10个线程执行完后因为线程存活5秒,所以都保留了

    堆栈打印出来也证实了,都在poll里面阻塞,然后第二批5个线程已提交,这存活的5个线程就会立马开始执行,执行完后再次阻塞再poll,等过了存活时间,线程全部结束!

    在第二批执行结束后再次打印堆栈,结果果然是这样

    问题知道了,解决方法

    这里的lock是一个线程锁,防止多个线程同时判断(同时判断了写这个还有意义么。。)

        /** 线程锁 */
        private Lock lock = new ReentrantLock();

    至此,写好了主要部分,测试下

    不过有个地方我自己代码走读,感觉是有问题

    感觉这里会存在线程安全问题,假设线程池队列为空,当前activeCount大于core,并发情况下,多个线程同时满足activeCount.get()>coreCount,之后所有线程都会走queue.poll分支,因为队列为空,所有线程queue.poll返回为null,所有线程全部结束掉,这样和保留core个线程冲突了。纠结许久后来发现这个假设不成立,要在没有进入while之前就出现队列为空,且activeCount>core,这种情况不会出现,因为在提交线程的方法(execute)已经限制了这种情况,但是这个代码看起来会有歧义,还是决定改造改造

    这回走读一次,感觉好多了,终于把主要功能写完了,也能正常跑了

    4.立即关闭线程池

    线程池里面的线程跑完了,但是还有core个线程阻塞着,这么一直阻塞着也不是办法,所以要有个关闭的方法,先写暴力关闭,当前运行的线程中断,队列抛弃

    思考时间:中断线程肯定是调用Thread.interrupt方法,这样我得拿到正在运行得线程才行,所以在新增线程的时候得保存在一个集合里,而且线程执行得时候异常了,也会新增线程,所以这个保存集合要线程安全,而且存取速度要快

    这里需要一个线程安全得set集合,ConcurrentHashMap里面有个newKeySet方法,看了下源码是通过ConcurrentHashMap的key来的,是线程安全的,直接用

    `

    /** 保存正在运行的线程 */

    private Set workers = ConcurrentHashMap.newKeySet();

    `

    还需要一个状态标识当前线程池是否关闭,这个状态要在线程并发(获取队列线程)情况下可以判断,所以用volatile修饰,默认正在运行

    `

    /** 线程池状态,-1:正在运行,0:暴力关闭,1:优雅关闭 */

    private volatile int status = -1;

    `

    新建暴力关闭方法stopNow

    在新建worker的时候都加到workers里面去,这时候一想,activeCount和workers.size是不是重复了,顺带把activeCount删掉,用workers.size代替,线程执行完成workers.remove掉

    新增方法addWorker

    提交线程方法改造,删掉了activeCount,用workers.size代替

    worker的run方法改造,删掉了activeCount,用workers.size代替,activeCount-1用workers.remove(this)代替

    改造完了,查看一下,暴力关闭需要立即中断线程,抛弃队列,所以在while获取队列那里要增加判断

    提交线程方法增加状态判断

    改造完成,测试下暴力关闭

    结果,整个程序也退出了,线程池结束了,队列只运行了一个线程

    5.优雅关闭线程池

    优雅的关闭线程池,是要让所有的线程和队列都运行完毕再关闭所有线程,这样就不能直接interrupt线程了,先设置status=1未优雅关闭

    新增优雅关闭方法

    提交线程方法增加状态,限制提交

    这里调用stop方法时分情况:

    - 1.存在线程还在执行(队列或者当前线程),执行完成后调用workers所有线程的interrupt,防止存在线程在queue.take处阻塞

    - 2.不存在线程还在执行(队列或者当前线程),调用workers所有线程的interrupt,防止存在线程在queue.take处阻塞

    所以这里需要一个标记,是否还存在线程在执行,我们可以用一个数字标识当前还需要执行的线程数量,执行完一个线程就-1

    增加成员变量remainingCount,标识剩余线程数

        /** 剩余线程数 */
        private AtomicInteger remainingCount = new AtomicInteger(0); 

    每次提交一个线程 1

    每次执行完一个线程-1

    第一种情况,存在线程还在执行,在执行完成后判断remainingCount是否为0

    第二种情况,不存在线程还在执行,在stop的时候增加判断

    抽象封装下方法

    顺便把worker的run方法也优化下,一个屏幕都截不下了

    跑个第一种情况的测试用例

    结果出乎意料,居然没有结束所有线程

    加日志调试

    出现了更夸张的错误

    根据控制台信息可以想象,原本的5个线程全部被interrupt,又不断地创建线程,又不断的被interrupt。这里会创建线程的地方只有在worker的run方法异常,finally代码段里面,而且没加日志的时候没有出现这种情况,加了日志就出现了。多次代码走读后,发现一种可能,在最初5个线程同时将队列消耗完后,2个线程进入take阻塞,3个线程开始进入interruptWorkers方法,导致那2个线程出现异常,异常后会退出线程,再次创建新线程,并且interrupt新线程,由此陷入死循环

    改造getQueueTask方法,不抛出异常,出现异常返回null。顺便走读一下status=0的情况,发现不影响

    再次运行测试用例,结果符合预期了,这里的异常堆栈可以忽略

    再测试优雅关闭的第二种情况

    结果正常

    去掉调试日志,至此,这个轮子线程池完成,具备线程池基础功能

    总结

    写这个线程池过程曲折,各种问题不断出现,特别时两种关闭方法,判断比较烦,代码走读和调试良久,才堪堪解决,由此联想ThreadPoolExecutor是多么强大,多么不简单

    图片较多,代码可以在Github上找到

    参考资料:ThreadPoolExecutor源码

    感谢crossoverjie的文章:https://crossoverjie.top/2019/05/20/concurrent/threadpool-01/

    本文来自chentiefeng的博客

    展开全文
  • Java线程池基础

    2021-03-16 02:39:04
    目录:一、线程池概述1、线程池类目前线程池类一般有两个,一个来自于Spring,一个来自于JDK:来自Spring的线程池:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor来自JDK的线程池:java.util....

    目录:

    一、线程池概述

    1、线程池类

    目前线程池类一般有两个,一个来自于Spring,一个来自于JDK:

    来自Spring的线程池:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

    来自JDK的线程池:java.util.concurrent.ThreadPoolExecutor

    说明:两个线程池类中的参数【线程池最大线程数】写法不同,在Spring线程池中为maxPoolSize,在JDK线程池中为maximumPoolSize,等价。

    两个线程池类的配置差不多,Spring的做了一些配置参数的简化,最终调用JDK的API。

    在执行并发任务时,我们可以把任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程,只要线程池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,当线程数量达到线程池核心线程数时,后续的任务被插入一个阻塞队列(BlockingQueue)进行等待,线程池里的空闲线程会去取这个队列里的任务。

    利用线程池的三个好处:

    降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗

    提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行

    提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控

    2、相关概念比喻

    线程池(thread pool)  工厂

    线程(thread)         工人,属于某个工厂,被工厂所管理

    任务(task)             等待工人处理的事情,即实现Runnable或Callable的类

    3、线程池行为比喻

    小赵(任务)去银行(线程池)办理业务,银行刚开始营业,此时窗口工作人员还未就位(初始线程数为0)

    于是经理(线程池管理者)催促1号正式员工到1号窗口接待小赵(创建线程),于是小赵被安排到1号窗口办理业务(执行线程任务)

    接着小钱(任务)也来到银行(线程池)办理业务,此时小赵还没有办理完业务,1号窗口轮不到小钱。该银行总共有2个窗口(corePoolSize为2),于是经理又催促2号正式员工到2号窗口接待小钱(又创建线程),小钱也开始办理业务(执行线程任务)

    紧接着小孙(又一个任务)也来到银行办理业务,此时前面两人还未办理完业务。在银行等待区有一张座位(缓存队列size为1)空着,于是小孙被经理安排到座位上等待并被告知:当1、2号窗口有空闲时,小孙就可以去窗口办理业务。此时,窗口满了,等待区也满了。

    这时小李也来到银行办理业务,于是经理安排临时工(corePoolSize之外的线程)在大堂手持移动设备为小李办理业务

    银行业务很繁忙,窗口满了、等待区满了、临时工也用上了(线程数达到maxPoolSize)。此时小周来到银行办理业务,于是经理只能按照《超出银行最大接待能力处理办法》(拒绝策略)拒绝小周办理业务

    随后,小赵、小钱、小孙、小李陆续办完业务离开银行。忙碌了大半天,来办理业务的人终于少了,此时临时工已经闲置了2个小时(keepAliveTime),2个窗口可处理之后并不繁忙的业务,经理见临时工没事做就让他下班,以避免造成不必要的资源浪费

    根据银行《正式员工空闲时处理办法》(是否清理corePoolSize线程开关),即使正式员工闲着也不得提前下班。所以,1号、2号窗口的正式员工继续等待接待客户(线程池内保持corePoolSize个线程)

    二、线程池参数

    ThreadPoolExecutor类的构造函数如下:

    1 public ThreadPoolExecutor(intcorePoolSize,2 intmaximumPoolSize,3 longkeepAliveTime,4 TimeUnit unit,5 BlockingQueueworkQueue,6 ThreadFactory threadFactory,7 RejectedExecutionHandler handler) {8 if (corePoolSize < 0 ||

    9 maximumPoolSize <= 0 ||

    10 maximumPoolSize < corePoolSize ||

    11 keepAliveTime < 0)12 throw newIllegalArgumentException();13 if (workQueue == null || threadFactory == null || handler == null)14 throw newNullPointerException();15 this.acc = System.getSecurityManager() == null ?

    16 null:17 AccessController.getContext();18 this.corePoolSize =corePoolSize;19 this.maximumPoolSize =maximumPoolSize;20 this.workQueue =workQueue;21 this.keepAliveTime =unit.toNanos(keepAliveTime);22 this.threadFactory =threadFactory;23 this.handler =handler;24 }

    构造函数的参数含义如下:

    corePoolSize:指定了线程池中核心线程的大小。它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去。当提交一个任务到线程池,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建新的线程,等到需要执行的任务数大于corePoolSize时就不再创建。(1、在创建线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务。除非调用了prestartAllCoreThreads()方法或prestartCoreThread()方法,在任务没有到来之前就预创建corePoolSize个线程或一个线程。2、在创建线程池后,默认情况下,线程池中的线程数为0,当有任务到来时线程池就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把后续到达的任务放到缓存任务队列当中。核心线程在allowCoreThreadTimeout被设置为true时会超时并被回收,默认情况下不会被回收)

    maxPoolSize/maximumPoolSize:指定了线程池中最大线程数量,即线程池允许创建的最大线程数。这个参数会根据使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量。如果任务队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。(当线程数大于等于corePoolSize,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maximumPoolSize。如果线程已等于maximumPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会按照一定的处理策略处理)

    keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间之后被销毁。(1、当线程空闲时间达到keepAliveTime,该线程会退回,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会陆续退出直到线程数量为0)

    unit:线程活动保持时间的单位。常用取值如下:

    TimeUnit.DAYS; //天

    TimeUnit.HOURS; //小时

    TimeUnit.MINUTES; //分钟

    TimeUnit.SECONDS; //秒

    TimeUnit.MILLISECONDS; //毫秒

    TimeUnit.MICROSECONDS; //微秒

    TimeUnit.NANOSECONDS; //纳秒

    workQueue:阻塞队列,用来存储等待执行的任务。

    阻塞队列有以下几种选择:

    1、ArrayBlockingQueue:一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序

    2、LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO(先进先出)排序元素,吞吐量通常要高于              ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列

    3、SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状       态,吞吐量通常要高于 LinkedBlockingQueue。静态工厂方法Executors.newCachedThreadPool使用了这个队列

    4、PriorityBlockingQueue:一个具有优先级的无限阻塞队列

    threadFactory:线程工厂,用于设置创建线程,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

    handler:饱和策略(拒绝策略),当线程池和阻塞队列都满了,说明线程池处于饱和状态,必须采取一种策略处理提交的新任务。

    当线程数量达到maximumPoolSize时的处理策略有以下几种:

    1)ThreadPoolExecutor.AbortPolicy:丢弃任务,并抛出RejectedExecutionException异常

    2)ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常

    3)ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的等待时间最久的任务,然后重新尝试执行任务(重复此过程)

    4)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,谁调用返回给谁

    注意:Spring的线程池和JDK线程池中的拒绝策略默认值都是ThreadPoolExecutor.AbortPolicy

    三、线程池的执行过程

    执行流程图:

    b214f150e4fcc3fae9ab7c6580347a0d.png

    1、当线程池中线程数小于corePoolSize时,对于新提交的任务,线程池将创建一个新线程来执行任务,即使此时线程池中存在空闲线程

    2、当线程池中线程数达到corePoolSize时,新提交的任务将会被线程池放入workQueue队列中,等待线程池中任务调度执行

    3、当workQueue已满,且corePoolSize < maximumPoolSize时,对于新提交的任务,线程池将创建新线程来执行任务

    4、当提交的任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理

    5、当线程池中线程数超过corePoolSize时,空闲线程的空闲时间达到keepAliveTime时,空闲线程会被关闭

    6、当设置allowCoreThreadTimeOut(true)时,线程池中的核心线程空闲时间达到keepAliveTime时也将被关闭

    线程池的工作顺序:corePoolSize -> 任务队列 -> maximumPoolsize -> 拒绝策略

    四、线程池的主要实现

    通过调用Executors类中的静态工厂方法可创建不同的线程池,这些线程池的内部实现原理都是相同的,仅仅是使用了不同的工作队列或线程池大小,如下:

    1、newFixedThreadPool:创建一个固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变。如果某个线程因为执行异常而结束,线程池会补充一个新的线程。

    构造函数如下:

    1 //第一个构造函数,参数只有线程数量,核心线程数与最大线程数一致

    2 public static ExecutorService newFixedThreadPool(intnThreads) {3 return newThreadPoolExecutor(nThreads, nThreads,4 0L, TimeUnit.MILLISECONDS,5 new LinkedBlockingQueue());6 }7

    8 //第二个构造函数,参数包含核心线程数和线程工厂,核心线程数与最大线程数一致

    9 public static ExecutorService newFixedThreadPool(intnThreads, ThreadFactory threadFactory) {10 return newThreadPoolExecutor(nThreads, nThreads,11 0L, TimeUnit.MILLISECONDS,12 new LinkedBlockingQueue(),13 threadFactory);14 }

    2、newSingleThreadExecutor:创建一个单线程的线程池,这个线程池只有一个线程在工作,也就是串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    构造函数如下:

    1 //构造函数,默认核心线程数和最大线程数都是1

    2 public staticExecutorService newSingleThreadExecutor() {3 return newFinalizableDelegatedExecutorService4 (new ThreadPoolExecutor(1, 1,5 0L, TimeUnit.MILLISECONDS,6 new LinkedBlockingQueue()));7 }

    3、newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了处理任务所需要的线程数,那么就会回收部分空闲(60秒不执行任务)的线程;当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或JVM)能够创建的最大线程大小。

    构造函数如下:

    1 //构造函数,核心线程数为0,最大线程数为Integer.MAX_VALUE,空闲线程超时时间为60秒

    2 public staticExecutorService newCachedThreadPool() {3 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,4 60L, TimeUnit.SECONDS,5 new SynchronousQueue());6 }

    4、newScheduledThreadPool:创建一个固定长度的线程池,支持定时的以及周期性的任务执行,类似于Timer

    构造函数如下:

    1 //第一个构造函数,指定核心线程数大小

    2 public static ScheduledExecutorService newScheduledThreadPool(intcorePoolSize) {3 return newScheduledThreadPoolExecutor(corePoolSize);4 }5

    6 //第二个构造函数,指定核心线程数大小及线程工厂

    7 public staticScheduledExecutorService newScheduledThreadPool(8 intcorePoolSize, ThreadFactory threadFactory) {9 return newScheduledThreadPoolExecutor(corePoolSize, threadFactory);10 }

    注意:线程池一般不允许使用Executors去创建,而要通过ThreadPoolExecutor方法创建,一方面是由于Executors框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活。另外,由于前面几种方法内部都是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

    五、线程池的使用

    1、向线程池提交任务方式

    使用execute向线程池提交任务

    1 public classExecuteTest {2

    3 public static voidmain(String[] args) {4

    5 BlockingQueue workQueue = new LinkedBlockingQueue<>();6 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, workQueue);7 poolExecutor.execute(newTaskOne());8 poolExecutor.execute(newTaskTwo());9 poolExecutor.shutdown();10 }11 }12

    13 class TaskOne implementsRunnable{14

    15 @Override16 public voidrun() {17 System.out.println("正在执行任务1...");18 }19 }20

    21 class TaskTwo implementsRunnable{22

    23 @Override24 public voidrun() {25 System.out.println("正在执行任务2...");26 }27 }

    执行结果:

    正在执行任务1...

    正在执行任务2...

    使用submit方法向线程池提交任务,返回一个Future对象。可通过这个Future对象来判断任务是否执行成功,通过get()方法获取返回值,get()方法会阻塞直到任务完成

    1 public classSubmitTest {2

    3 public static void main(String[] args) throwsExecutionException, InterruptedException {4

    5 ExecutorService executorService =Executors.newCachedThreadPool();6 List> resultList = new ArrayList>();7 //创建10个任务并执行

    8 for(int i = 0;i < 10;i++){9 //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中

    10 Future future = executorService.submit(newTaskWithResult(i));11 resultList.add(future);12 }13 //遍历结果集

    14 for(Futurefuture : resultList){15 //Future返回如果没有完成,则一直循环等待,直到Future返回完成

    16 while(!future.isDone());{17 //打印各个线程(任务)执行的结果

    18 System.out.println(future.get());19 }20 }21 executorService.shutdown();22 }23 }24

    25 class TaskWithResult implements Callable{26

    27 private intid;28 public TaskWithResult(intid){29 this.id =id;30 }31

    32 @Override33 public String call() throwsException {34 return "执行结果" +id;35 }36 }

    执行结果:

    执行结果0

    执行结果1

    执行结果2

    执行结果3

    执行结果4

    执行结果5

    执行结果6

    执行结果7

    执行结果8

    执行结果9

    2、执行定时及周期性任务

    Timer工具管理定时及周期性任务。示例代码如下:

    1 public classTimerTest {2

    3 static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");4

    5 public static voidmain(String[] args) {6

    7 TimerTask timerTaskOne = newTimerTask() {8 @Override9 public voidrun() {10 System.out.println("任务1执行时间:" + sdf.format(newDate()));11 try{12 //模拟任务1执行时间3秒

    13 Thread.sleep(3000);14 }catch(InterruptedException ex){15 ex.printStackTrace();16 }17 }18 };19

    20 System.out.println(String.format("当前时间:" + sdf.format(newDate())));21 Timer timer = newTimer();22 //间隔4秒钟周期性执行任务1

    23 timer.schedule(timerTaskOne, new Date(), 4000);24 }25 }

    执行结果:

    当前时间:2019-09-24 16:35:39

    任务1执行时间:2019-09-24 16:35:39

    任务1执行时间:2019-09-24 16:35:43

    任务1执行时间:2019-09-24 16:35:47

    任务1执行时间:2019-09-24 16:35:51

    任务1执行时间:2019-09-24 16:35:55

    上述任务1以4秒为间隔周期性执行。但是Timer存在一些缺陷,主要是两方面的问题:

    缺陷1:Timer只能创建一个唯一的线程来执行所有的TimerTask任务,如果一个TimerTask任务的执行很耗时,会导致其他的TimerTask的准确性出现问题。代码如下:

    1 public classTimerDefectTestOne {2

    3 static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");4

    5 public static voidmain(String[] args) {6

    7 TimerTask timerTaskOne = newTimerTask() {8 @Override9 public voidrun() {10 System.out.println(String.format("任务1执行时间:" + sdf.format(newDate())));11 try{12 Thread.sleep(10000);13 }catch(InterruptedException ex){14 ex.printStackTrace();15 }16 }17 };18

    19 TimerTask timerTaskTwo = newTimerTask() {20 @Override21 public voidrun() {22 System.out.println(String.format("任务2执行时间:" + sdf.format(newDate())));23 }24 };25

    26 System.out.println("当前时间:" + sdf.format(newDate()));27 Timer timer = newTimer();28 //间隔1秒周期性执行任务1

    29 timer.schedule(timerTaskOne, new Date(), 1000);30 //间隔4秒周期性执行任务2

    31 timer.schedule(timerTaskTwo, new Date(), 4000);32 }33 }

    执行结果:

    当前时间:2019-09-24 16:40:51

    任务1执行时间:2019-09-24 16:40:51

    任务2执行时间:2019-09-24 16:41:01

    任务1执行时间:2019-09-24 16:41:01

    任务1执行时间:2019-09-24 16:41:11

    任务2执行时间:2019-09-24 16:41:21

    任务1执行时间:2019-09-24 16:41:21

    任务1执行时间:2019-09-24 16:41:31

    任务2执行时间:2019-09-24 16:41:41

    任务1执行时间:2019-09-24 16:41:41

    由执行结果可看出任务2的执行周期并不是4秒,与缺陷1内容描述符合。

    缺陷2:如果TimerTask抛出未检查的异常,Timer将产生无法预料的行为。Timer线程并不捕获异常,所有TimerTask抛出的未检查的异常都会终止Timer线程。代码如下:

    1 public classTimerDefectTestTwo {2

    3 static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");4

    5 public static voidmain(String[] args) {6

    7 TimerTask timerTaskOne = newTimerTask() {8 @Override9 public voidrun() {10 System.out.println(String.format("任务1执行时间:" + sdf.format(newDate())));11 throw newRuntimeException();12 }13 };14

    15 TimerTask timerTaskTwo = newTimerTask() {16 @Override17 public voidrun() {18 System.out.println(String.format("任务2执行时间:" + sdf.format(newDate())));19 }20 };21

    22 System.out.println("当前时间:" + sdf.format(newDate()));23 Timer timer = newTimer();24 //间隔1秒周期性执行任务1

    25 timer.schedule(timerTaskOne, new Date(), 1000);26 //间隔4秒周期性执行任务2

    27 timer.schedule(timerTaskTwo, new Date(), 4000);28 }29 }

    执行结果:

    当前时间:2019-09-24 16:48:27

    任务1执行时间:2019-09-24 16:48:27

    Exception in thread "Timer-0" java.lang.RuntimeException

    at com.aisino.threadPool.TimerDefectTestTwo$1.run(TimerDefectTestTwo.java:22)

    at java.util.TimerThread.mainLoop(Timer.java:555)

    at java.util.TimerThread.run(Timer.java:505)

    Timer缺陷的解决方法:使用ScheduledThreadPoolExecutor替换Timer

    针对缺陷1,使用ScheduledThreadPoolExecutor的替换Timer。代码如下:

    1 public classScheduledThreadPoolExecutorTestOne {2

    3 static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");4

    5 public static voidmain(String[] args) {6

    7 TimerTask timerTaskOne = newTimerTask() {8 @Override9 public voidrun() {10 System.out.println(String.format("任务1执行时间:" + sdf.format(newDate())));11 try{12 Thread.sleep(10000);13 }catch(InterruptedException ex){14 ex.printStackTrace();15 }16 }17 };18

    19 TimerTask timerTaskTwo = newTimerTask() {20 @Override21 public voidrun() {22 System.out.println(String.format("任务2执行时间:" + sdf.format(newDate())));23 }24 };25

    26 System.out.println("当前时间:" + sdf.format(newDate()));27 ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2);28 poolExecutor.scheduleAtFixedRate(timerTaskOne, 0, 1000, TimeUnit.MILLISECONDS);29 poolExecutor.scheduleAtFixedRate(timerTaskTwo, 0, 4000, TimeUnit.MILLISECONDS);30 }31 }

    执行结果:

    当前时间:2019-09-24 16:52:05

    任务1执行时间:2019-09-24 16:52:05

    任务2执行时间:2019-09-24 16:52:05

    任务2执行时间:2019-09-24 16:52:09

    任务2执行时间:2019-09-24 16:52:13

    任务1执行时间:2019-09-24 16:52:15

    任务2执行时间:2019-09-24 16:52:17

    任务2执行时间:2019-09-24 16:52:21

    任务1执行时间:2019-09-24 16:52:25

    任务2执行时间:2019-09-24 16:52:25

    任务2执行时间:2019-09-24 16:52:29

    根据执行结果可看出,任务1以10秒为间隔执行,任务2以4秒为间隔周期性执行,解决缺陷1。

    针对缺陷2,使用ScheduledThreadPoolExecutor的替换Timer。代码如下:

    1 public classScheduledThreadPoolExecutorTestTwo {2

    3 static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");4

    5 public static voidmain(String[] args) {6

    7 TimerTask timerTaskOne = newTimerTask() {8 @Override9 public voidrun() {10 System.out.println(String.format("任务1执行时间:" + sdf.format(newDate())));11 throw newRuntimeException();12 }13 };14

    15 TimerTask timerTaskTwo = newTimerTask() {16 @Override17 public voidrun() {18 System.out.println(String.format("任务2执行时间:" + sdf.format(newDate())));19 }20 };21

    22 System.out.println("当前时间:" + sdf.format(newDate()));23 ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2);24 poolExecutor.scheduleAtFixedRate(timerTaskOne, 0, 1000, TimeUnit.MILLISECONDS);25 poolExecutor.scheduleAtFixedRate(timerTaskTwo, 0, 4000, TimeUnit.MILLISECONDS);26 }27 }

    执行结果:

    当前时间:2019-09-24 16:56:42

    任务1执行时间:2019-09-24 16:56:42

    任务2执行时间:2019-09-24 16:56:42

    任务2执行时间:2019-09-24 16:56:46

    任务2执行时间:2019-09-24 16:56:50

    任务2执行时间:2019-09-24 16:56:54

    任务2执行时间:2019-09-24 16:56:58

    任务2执行时间:2019-09-24 16:57:02

    任务2执行时间:2019-09-24 16:57:06

    由执行结果可看出,当任务1因异常而停止时,任务2仍正常以4秒为间隔周期性执行,解决缺陷2。

    3、关闭线程池

    关闭线程池可通过调用的shutdown()方法或shutdownNow()方法来实现,两个方法的实现原理不同。shutdown()方法的原理是将线程池的状态由RUNNING转变为SHUTDOWN状态,SHUTDOWN状态下线程池不再接受新任务,但是会将工作队列中的任务执行结束,然后中断空闲线程。shutdownNow()方法的原理是遍历线程池中的所有线程,然后逐个调用线程的interrupt方法来中断线程。shutdownNow()方法会首先将线程池的状态设置为STOP,然后尝试中断所有线程(包括工作线程和空闲线程),并返回工作队列中所有未完成任务的列表。

    只要调用了两个方法中的任意一个,isShutdown()方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated()方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown()来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow()方法。

    六、线程池的正确关闭方式

    应用停机时,需要释放资源,关闭连接。对于一些定时任务或者网络请求服务将会使用线程池,当应用停机时需要正确安全的关闭线程池,如果处理不当,可能造成数据丢失,业务请求结果不正确等问题。

    关闭线程池我们可以选择什么都不做,JVM关闭时会自然的清除线程池对象。这么做存在很大的弊端,线程池中正在执行的线程以及队列中还未执行的任务将会变得不可控。所以我们需要想办法控制这些正在执行的线程以及未执行的任务。

    ThreadPoolExecutor类中提供了两个主动关闭的方法:shutdown()和shutdownNow(),这两个方法都可以用于关闭线程池,但是具体效果不一样。

    1、线程池的状态

    线程池状态关系图如下:

    a1d150b8d73c43869443682bdafa280e.png

    线程池总共存在5种状态,分别为:

    RUNNING:线程创建之后的初始状态,这种状态下可以执行任务。

    SHUTDOWN:该状态下的线程池不再接受新任务,但是会将工作队列中的任务执行结束。

    STOP:该状态下线程池不再接受新任务,但是不会处理工作队列中的任务,并且将会中断线程。

    TIDYING:该状态下所有任务都已终止,将会执行terminated()钩子方法。

    TERMINATED:执行完terminated()钩子方法之后。

    当执行shutdown()方法时将会使线程池状态从RUNNING转变为SHUTDOWN,而调用shutdownNow()方法之后线程池状态将会从RUNNING转变为STOP。从上图可看出,当线程池处于SHUTDOWN状态,还可以继续调用shutdownNow()方法,将其状态转变为STOP。

    2、shutdown()方法

    shutdown()方法源码如下:

    1 public voidshutdown(){2 final ReentrantLock mainLock = this.mainLock;3 mainLock.lock();4 try{5 //检查权限

    6 checkShutdownAccess();7 //设置线程池状态

    8 advanceRunState(SHUTDOWN);9 //中断空闲线程

    10 interruptIdleWorkers();11 //钩子函数,主要用于清理一些资源

    12 onShutdown();13 } finally{14 mainLock.unlock();15 }16 tryTerminate();17 }

    shutdown()方法首先加锁,其次检查系统安装状态,接着将线程池状态转变为SHUTDOWN,在这之后线程池不再接收提交的新任务。此时如果继续向线程池提交任务,将会使用线程池拒绝策略响应,默认情况下将会使用ThreadPoolExecutor.AbortPolicy,抛出RejectedExecutionException异常。

    interruptIdleWorkers()方法只会中断空闲的线程,不会中断正在执行任务的线程。空闲的线程将会阻塞在线程池的阻塞队列上。

    3、shutdownNow()方法

    shutdownNow()方法源码如下:

    1 public ListshutdownNow(){2 Listtasks;3 final ReentrantLock mainLock = this.mainLock;4 mainLock.lock();5 try{6 //检查状态

    7 checkShutdownAccess();8 //将线程池状态转变为STOP

    9 advanceRunState(STOP);10 //中断所有线程,包括工作线程以及空闲线程

    11 interruptWorkers();12 //丢弃工作队列中的存量任务

    13 tasks =drainQueue();14 } finally{15 mainLock.unlock();16 }17 tryTerminate();18 returntasks;19 }

    shutdownNow()方法将会把线程池状态设置为STOP,然后中断所有线程,最后取出工作队列中所有未完成的任务返回给调用者。

    对比shutdown()方法,shutdownNow()方法比较粗暴,直接中断工作线程。不过需要注意:中断线程并不代表线程立刻结束。这里需要线程主动配合线程中断响应。

    线程池的shutdown()方法与shutdownNow()方法都不会主动等待执行任务的结束,如果需要等到线程池任务执行结束,需要调用awaitTermination主动等待任务调用结束。

    调用方法如下:

    1 poolExecutor.shutdown();2 try{3 while(!poolExecutor.awaitTermination(60, TimeUnit.SECONDS)){4 System.out.println("线程池任务还未执行结束");5 }6 }catch(InterruptedException ex){7 ex.printStackTrace();8 }

    如果线程池任务执行结束,awaitTermination()方法将会返回true,否则当等待时间超过指定时间后将会返回false。如果需要使用这种机制,建议在上面的基础上增加一定重试次数。

    线程中断机制:线程中的interrupt()方法只是设置一个中断标志,不会立即中断正常的线程。如果想让中断立即生效,必须在线程内调用Thread.interrupted()判断线程的中断状态。对于阻塞的线程,调用中断时,线程将会立即退出阻塞状态并抛出InterruptedException异常。所以对于阻塞线程需要正确处理InterruptedException异常。

    4、优雅关闭线程池

    由线程池状态关系图可知,处于SHUTDOWN状态下的线程池依旧可以调用shtudownNow()方法,所以可以结合shutdown、shutdownNow、awaitTermination,更加优雅地关闭线程池。

    1 //调用shutdown()方法关闭线程池

    2 poolExecutor.shutdown();3 try{4 //等待60秒

    5 if (!poolExecutor.awaitTermination(60, TimeUnit.SECONDS)){6 //调用shutdownNow取消正在执行的任务

    7 poolExecutor.shutdownNow();8 //再次等待60秒,如果还未结束,可以再次尝试,或者直接放弃

    9 if(!poolExecutor.awaitTermination(60, TimeUnit.SECONDS)){10 System.err.println("线程池任务未正常执行结束");11 }12 }13 }catch(InterruptedException ex){14 //重新调用shutdownNow

    15 poolExecutor.shutdownNow();16 }

    七、线程池参数调优

    参数如何设置跟系统的负载有直接的关系,假设下面的参数表示目前的系统负载。

    tasks:每秒需要处理的最大任务数量

    tasktime:处理一个任务所需要的时间

    responsetime:系统允许任务最大的响应时间,比如每个任务的响应时间不得超过2秒

    1、corePoolSize

    每个任务需要tasktime秒处理,则每个线程每秒可以处理1/tasktime个任务。系统每秒有tasks个任务需要处理,则需要的线程数为:tasks/(1/tasktime),即tasks*tasktime个线程。假设系统每秒任务数范围为100至1000,每个任务耗时0.1秒,则需要的线程数为100*0.1至1000*0.1,即10至100。那么corePoolSize应该设置为大于10,corePoolSize可设置为20。

    2、workQueue

    任务队列的长度与核心线程数以及系统对任务响应时间的要求有关。队列长度可设置为(corePoolSize/tasktime)*responsetime,如(20/0.1)*2=400,即队列长度可设置为400。

    注意:队列长度设置过大,会导致任务响应时间过长,切忌使用new LinkedBlockingQueue(),队列LinkedBlockingQueue将队列长度设置为Integer.MAX_VALUE,将会导致线程数永远为corePoolSize,再也不会增加。当任务数量陡增时,任务响应时间也将随之陡增。

    3、maximumPoolSize

    当系统负载达到最大值时,核心线程数已无法按时处理完所有任务,这时就需要增加线程。每秒200个任务需要20个线程,那么当每秒任务达到1000个任务时,则需要(1000 - workQueue)*(20/200),即60个线程,可将maximumPoolSize设置为60。

    4、keepAliveTime

    线程数量不能只增加不减少。当负载降低时,可减少线程数量,如果一个线程空闲时间达到keepAliveTime,该线程就该退出,默认情况下线程池最少会保持corePoolSize个线程(allowCoreThreadTimeout设置为false),keepAliveTime可设置为0。

    5、allowCoreThreadTimeout

    默认情况下,核心线程不会退出。可将allowCoreThreadTimeout设置为true,让核心线程也退出。

    以上关于线程数量的计算并没有考虑CPU的情况。若结合CPU的情况,比如,当线程数量达到50时,CPU达到100%,则将maxPoolSize设置为60也不合适,此时若系统负载长时间维持在每秒1000个任务,则超出线程池处理能力,应设法降低每个任务的处理时间(tasktime)。

    展开全文
  • 设为“星标”,好文章不错过!1 定时任务Netty、Quartz、Kafka 以及 Linux 都有定时任务功能...在任务量大、性能要求高的场景,为了将任务存取及取消操作时间复杂度降为 O(1),会采用时间轮算法。2 时间轮模型及其应...

    设为“星标”,好文章不错过!

    1 定时任务

    Netty、Quartz、Kafka 以及 Linux 都有定时任务功能。

    JDK 自带的java.util.Timer和DelayedQueue可实现简单的定时任务,底层用的是堆,存取复杂度都是 O(nlog(n)),但无法支撑海量定时任务。

    在任务量大、性能要求高的场景,为了将任务存取及取消操作时间复杂度降为 O(1),会采用时间轮算法。

    2 时间轮模型及其应用

    一种高效批量管理定时任务的调度模型。一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务。

    指针周期性跳动,跳动到一个槽位,就执行该槽位的定时任务。

    Hashed Timing Wheel 结构示意图

    适用场景

    故障恢复

    流量控制

    调度算法

    控制网络中的数据包生命周期

    计时器维护代价高,如果

    处理器在每个时钟滴答声中都会中断

    使用精细粒度计时器

    未完成的计时器很多

    需要高效的定时器算法以减少总体中断的开销。

    单层时间轮的容量和精度都是有限的,对于精度要求特别高、时间跨度特别大或是海量定时任务需要调度的场景,通常会使用多级时间轮以及持久化存储与时间轮结合的方案。

    模型和性能指标

    模型中的规则

    客户端调用:

    START_TIMER(时间间隔,Request_ID,Expiry_Action)

    STOP_TIMER(Request_ID)

    计时器tick调用:

    PER_TICK_BOOKKEEPING

    EXPIRY_PROCESSING

    性能指标

    空间

    数据结构使用的内存

    延迟

    开始和结束上述任何例程所需的时间

    3 Dubbo的时间轮结构

    核心接口

    TimerTask

    在 Dubbo 中,所有定时任务都要实现 TimerTask 接口。只定义了一个 run() 方法,入参是一个 Timeout 接口对象。

    Timeout

    Timeout 对象与 TimerTask 对象一一对应,类似线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。

    通过 Timeout 对象,不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。

    Timeout 接口中的方法:

    Timer 接口定义了定时器的基本行为,核心是 :提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,类似于向线程池提交任务。

    HashedWheelTimeout

    HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类。HashedWheelTimeout 扮演了两个角色:

    时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器

    定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于在时间轮外部查看和控制定时任务

    核心字段

    prev、next。通过双向链表被用来在HashedWheelTimerBucket链timeouts(定时任务),由于只在WorkerThread上行动,没有必要进行同步/volatile。

    task,实际被调度的任务

    deadline,定时任务执行的时间。在创建 HashedWheelTimeout 时指定

    计算公式:,ns

    state,定时任务当前所处状态

    可选状态:

    STATE_UPDATER 用于实现 state 状态变更的原子性。

    remainingRounds,当前任务剩余的时钟周期数。时间轮所能表示的时间长度有限,在任务到期时间与当前时刻的时间差,超过时间轮单圈能表示时长,就出现套圈,需要该字段值表示剩余的时钟周期。

    核心API

    isCancelled()

    isExpired()

    state()

    检查当前 HashedWheelTimeout 状态

    cancel() 方法

    expire() 方法

    remove()

    HashedWheelBucket

    时间轮中的一个槽。

    时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 对象,也就关联了一个 定时任务。

    HashedWheelBucket 持有双向链表的首尾两个节点 - head 和 tail,再加上每个 HashedWheelTimeout 节点均持有前驱和后继引用,即可正、逆向遍历整个链表。

    核心API

    addTimeout()

    pollTimeout()

    remove()

    从双向链表中移除指定的 HashedWheelTimeout 节点。

    clearTimeouts()

    循环调用 pollTimeout() 方法处理整个双向链表,并返回所有未超时或者未被取消的任务。

    expireTimeouts()

    遍历双向链表中的全部 HashedWheelTimeout 节点。在处理到期的定时任务时,会通过 remove() 方法取出,并调用其 expire() 方法执行;对于已取消的任务,通过 remove() 方法取出后直接丢弃;对于未到期的任务,会将 remainingRounds 字段(剩余时钟周期数)减一。

    HashedWheelTimer

    接口的实现,通过时间轮算法实现了一个定时器。

    职能

    根据当前时间轮指针选定对应 槽,从链表头部开始迭代,计算每个 定时任务:

    属于当前时钟周期则取出运行

    不属于则将其剩余的时钟周期数减一

    核心域

    workerState

    时间轮当前所处状态,三个可选值,由 实现其原子地修改。

    startTime

    当前时间轮的启动时间,提交到该时间轮的定时任务的 deadline 字段值均以该时间戳为起点进行计算。

    wheel

    时间轮环形队列,每个元素都是一个槽。当指定时间轮槽数为 n 时,会向上取最靠近 n 的 2 次幂值

    timeouts、cancelledTimeouts

    HashedWheelTimer 会在处理 HashedWheelBucket 的双向链表前,先处理这俩队列的数据:

    timeouts 队列

    缓冲外部提交时间轮中的定时任务

    cancelledTimeouts 队列

    暂存取消的定时任务

    tick

    位于 ,时间轮的指针,步长为 1 的单调递增计数器

    mask

    掩码, ,执行 便能定位到对应的时钟槽

    ticksDuration

    时间指针每次加 1 所代表的实际时间,单位为纳秒。

    pendingTimeouts

    当前时间轮剩余的定时任务总数。

    workerThread

    时间轮内部真正执行定时任务的线程。

    worker

    真正执行定时任务的逻辑封装这个 Runnable 对象中。

    newTimeout()

    提交定时任务,在定时任务进入到 timeouts 队列之前会先调用 start() 方法启动时间轮,其中会完成下面两个关键步骤:

    确定时间轮的 startTime 字段

    启动 workerThread 线程,开始执行 worker 任务。

    之后根据 startTime 计算该定时任务的 deadline,最后才能将定时任务封装成 并添加到 队列。

    4 时间轮指针一次转动的执行流程

    时间轮指针转动,时间轮周期开始

    清理用户主动取消的定时任务,这些定时任务在用户取消时,记录到 队列中。在每次指针转动的时候,时间轮都会清理该队列

    将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中

    根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务

    检测时间轮的状态。如果时间轮处于运行状态,则循环执行上述步骤,不断执行定时任务。如果时间轮处于停止状态,则执行下面的步骤获取到未被执行的定时任务并加入 队列:遍历时间轮中每个槽位,并调用 () 方法;对 timeouts 队列中未被加入槽中循环调用 poll()

    最后再次清理 cancelledTimeouts 队列中用户主动取消的定时任务。

    5 定时任务应用

    并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次任务执行完成的时候,调用 newTimeout() 方法再次提交当前任务,这样就会在下个周期执行该任务。即使在任务执行过程中出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不断地提交进来,导致任务堆积。

    Dubbo 时间轮应用主要在如下方面:

    失败重试, 例如,Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心订阅时的失败重试等

    周期性定时任务, 例如,定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机制

    参考

    https://zhuanlan.zhihu.com/p/32906730

    展开全文
  • 参考文献 附录 time_holder.hpp 源码 #pragma once #include <thread> #include <chrono>... time_holder() : t(std::... // 自类创建开始,需要保持多久时间,如果不足保持时间,则阻塞等待到期,若已经.

    0. 定时器简介

    定时器通常包括至少两个成员:一个超时时间(通常采用相对时间或者超时时间)和一个超时时间到达后的一个回调函数。

    有时候还可能包括回调函数被运行时须要传入的参数,以及是否又一次启动定时器,更改定时器的超时时间等。

    假设使用链表作为容器来串联全部的定时器。则每一个定时器还要包括指向下一个定时器的指针成员。进一步,假设链表是双向的,则每一个定时器还须要包括指向前一个定时器的指针成员。

    0.1 排序链表的弊端

    基于排序链表的定时器使用唯一的一条链表来管理所有的定时器,所以插入操作的效率随着定时器的数目增多而降低。而时间轮使用了哈希表处理冲突的思想,将定时器散列到不同的链表上。这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。

    1. 时间轮计时器简介

    1.1 简单时间轮简介

    简单时间轮

    在这个时间轮中,实线指针指向轮子上的一个槽(slot)。它以恒定的速度顺时针转动,每转动一步就指向下一个槽(slot)。每次转动称为一个滴答(tick)。一个tick时间间隔为时间轮的si(slot interval)。该时间轮共有N个槽,因此它转动一周的时间是Nsi.每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差Nsi的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts(timer slot)对应的链表中:

    ts=(cs+(ti/si))%N

    对于时间轮而言,要提高精度,就要使si的值足够小; 要提高执行效率,则要求N值足够大,使定时器尽可能的分布在不同的槽。

    但是简单时间轮存在一个明显的弊端是,在一个轮子的情况下,可跨越的最大计时时长(一个轮子的刻度数总数 * si)和时间轮的精度难以达到平衡,这个时候,多级时间轮即可在不丢失精度的前提下,加大时间轮计时器可计时的跨度。

    1.2 多级时间轮简介

    时间轮分成多个层级,每一层是一个圈,和时钟类似,和水表更像,如下面图:
    水表

    当个位的指针转完一圈到达0这个刻度之后,十位的指针转1格;当十位的转完一圈,百位的转1格,以此类推。这样就能表示很长的度数。

    时间轮能表达的时间长度,和圈的数量以及每一圈的长度成正比。假设有4圈,每个圈60个刻度,每个刻度表示1毫秒,那么这个时间轮可以表示这么长:

    60 x 60 x 60 x 60 = 12,960,000‬(ms) = 3.6 小时

    2. 多级时间轮代码实现分析

    2.1 时间轮进制的本质分析

    考虑到多级时间轮的进位思想,本质是一种计数进制的体现,上面例子中的4个轮子,每个轮子60刻度其实就是60进制。无论采用多少进制,计数的本质都是连续的。所以无论是用60进制,61进制,10进制都是一样的。

    2.2 利用位运算的取巧进制实现

    对于计算机而言,人类看到的10,20的整数,对它们并不友善。它们更喜欢二进制数,这样它们能够处理的更快。

    基于上述前提,这里采用了一个32位的无符号整型作为5级时间轮计时器的计时辅助,此32位无符号整型自程序运行开始一直递增,每一个tick自增1,采用这样的方式计数,可以通过这一个32位无符号整型来同时表示5个轮子的时间刻度位置。

    一级轮二级轮三级轮四级轮五级轮
    6bit6bit6bit6bit8bit
    11111111111111111111111111111111

    这么表示的好处是,每个轮子的指针位数可以通过位运算(左右移和位与)来迅速获得,参考这样的宏:

    // 第1个轮占的位数
    #define TVR_BITS 8
    // 第1个轮的长度
    #define TVR_SIZE (1 << TVR_BITS)
    // 第n个轮占的位数
    #define TVN_BITS 6
    // 第n个轮的长度
    #define TVN_SIZE (1 << TVN_BITS)
    // 掩码:取模或整除用
    #define TVR_MASK (TVR_SIZE - 1)
    #define TVN_MASK (TVN_SIZE - 1)
    

    想取某一个圈的当前指针位置是:

    // 第1个圈的当前指针位置
    #define FIRST_INDEX(v) ((v) & TVR_MASK)
    // 后面第n个圈的当前指针位置
    #define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n - 1) * TVN_BITS)) & TVN_MASK)
    

    2.3 主动轮与从动轮

    虽然是多级时间轮,但是每个轮子的作用并不相同

    轮子的类型主要有两类:工作轮与从动轮。以本例实现为例子进行讲解,如下图所示:

    在这里插入图片描述

    进位逻辑

    主动轮:即上图中序号最大的5号轮,每一个定时周期指针移动一格。当5轮转动一圈,4轮进一格,4轮转一圈3轮进一格。

    从动轮:1-4轮,他们无法自己主动移动,需要被动等待低级轮进位才能移动。

    进位逻辑由2.2中分析的位运算天然保证,无需额外加代码实现

    执行定时逻辑

    主动轮:当刻度指针指向当前槽的时候,槽内的任务被顺序执行。这里采用的STL的std::list,每次新增的任务都插入到任务链的链头。这里选择插入链头和链尾的复杂度其实没本质区别。和C语言版本的list不一样的是,STL中的头插和尾插都是常数复杂度,详见list::emplace_backlist::emplace_front中的Complexity

    从动轮:当对应轮的刻度指针指向当前槽的时候,槽内的任务链依次向低级轮(序号较高的轮)转移,特别注意,从动轮没有执行任务权限,只是对任务进行记录与缓存。

    这里需要指出的是,定时任务在添加进入时间轮的时候,任务的跳转路径已经可以计算出来。例如插入的时候,处于2号轮的第3个刻度,下一次跳到4号轮的第4个刻度这些信息在插入的那一刻已经可以得到。每个任务插入的时候,立即算好跳转路径,后续转移查表即可,无需重新计算

    删除定时器逻辑

    若无需涉及删除定时器的功能,那么数据结构可能直接将回调函数对象std::function放在槽中即可,这也是为什么很多开源定时器不提供删除插入定时器中任务的功能,简单好用。

    但如果需要能够删除定时器的功能,那么数据结构就需要变化。这里采用了std::shared_ptrstd::unordered_map的方式实现删除功能。

    具体实现方式

    1. 使用智能指针(std::shared_ptr)保存定时任务对象的方式。std::shared_ptr对象在时间轮的对应槽中保存一份,同样在std::unordered_map也保存一份。

    2. 每次在主动轮执行任务之前,使用一下std::shared_ptrunique接口判断std::shared_ptr的引用计数,如果是独占的,那么说明这个任务已经被删除,是过期任务。那么不执行直接跳过。

    3. 由于主动轮每次是弹出所有待执行任务,如果任务被跳过不执行,相当于已经被删除。

    PS. 由于任务实际执行不在主动轮运行过程中进行,而是由一个异步任务线程池执行,所以这里选择使用std::shared_ptr的好处在于,对于异步任务线程池而言,std::shared_ptr能够确保执行任务的时候,对象是存活的,防止出现意外。

    3. 基于ASIO的异步线程池

    这里简单的实现了一个基于ASIO的异步线程池,用于接受主动轮槽中到期的任务,是定时器任务的实际执行者。由于需要开发时间轮类的初衷是希望能够监控一系列服务的存活情况,所以这里的异步任务需要获取任务运行结果,这里采用了std::packaged_task的方式,将异步执行任务的结果用std::future传递出来。

    设计目的

    这样的方式,能够在定时器需要执行的任务无论耗时长短均不会影响到主动轮的嘀嗒(tick)动作,确保每个(tick)的动作与任务执行耗时解耦。

    4. time_holder类

    这个类很简单,只是为了保持固定的时长,因为即便每次(tick)动作的耗时已经和任务解耦,但是依旧无法保证每次tick的时长是一个常数,所以在每次tick之后,time_holder的时长实际是按照tick之后的时间点与下次tick时间点的一个动态值。这里主要完成这个类的核心接口依旧是标准库中的:std::this_thread::sleep_until

    5. 复杂度分析

    实现方式StartTimerStopTimerPerTickBookkeeping
    基于链表O(1)O(n)O(n)
    基于排序链表O(n)O(1)O(1)
    基于最小堆O(lgn)O(1)O(1)
    基于时间轮O(1)O(1)O(1)

    通过复杂度比较,也可以发现时间轮对比其他方式显现的定时器,具有各方面复杂度均为恒定的效果,但万事万物都是均衡的,这样的常数时间复杂度是用较大的空间消耗换来的。可幸的是,这份空间开销目前在承受范围内。

    6. 精度分析

    与精度相关的代码其实就这么几行。

    std::thread th([this](){
        time_holder holder;
        uint32_t times = 1;    
        while (alive) {
            if (times == 0) {  
                    // uint32次后再重新定位原点,频繁定位起点会造成累计误差
                    // 重定位理论上越少越好,这里不用uint64是因为uint64加法
                    // 32位板子上指令较多,如果是64位cpu,可以采用uint64
                holder.reset();
                times = 1;
            }
            tick();
            holder.hold(unit * times++);
        }
    });
    

    可以看到主要精度还是由holder.hold保证的,而这又只是一个标准库的简单封装,所以精度依旧是由std::chrono保证的,实测起来准度还是不错的。

    参考文献

    1. 知乎:时间轮定时器
    2. 高性能计时器Timer的设计(时间轮和时间堆两种方式)
    3. C++基础-map与unordered_map

    附录

    time_holder.hpp 源码
    
    
    #pragma once
    #include <thread>
    #include <chrono>
    
    namespace monitor 
    {
    	class time_holder
    	{
    	public:
    		time_holder() : t(std::chrono::steady_clock::now()) {}
            // 自类创建开始,需要保持多久时间,如果不足保持时间,则阻塞等待到期,若已经超时,不阻塞直接退出
    		void hold(std::chrono::milliseconds msec) {
    			std::this_thread::sleep_until(msec + t);
    		}
    		// 重置time_holder计时时间
    		void reset(){
    			t = std::chrono::steady_clock::now();
    		}
    	private:
    		std::chrono::steady_clock::time_point t;
    	};
    }
    
    time_wheel.hpp 源码
    
    
    #pragma once
    #include <thread>
    #include <chrono>
    #include <memory>
    #include <array>
    #include <list>
    #include <functional>
    #include <mutex>
    #include <algorithm>
    #include <unordered_map>
    
    #ifndef USE_BOOST_FUTURE
    #include <future>
    #else
    //#define BOOST_THREAD_PROVIDES_FUTURE
    #define BOOST_THREAD_VERSION 4
    #include "boost/thread/future.hpp"
    #endif
    
    #include "asyncthreadpool.hpp"
    #include "time_holder.hpp"
    
    namespace monitor 
    {
        class AsyncThreadPool;
        // 第1个轮占的位数
        #define TVR_BITS 8
        // 第1个轮的长度
        #define TVR_SIZE (1 << TVR_BITS)
        // 第n个轮占的位数
        #define TVN_BITS 6
        // 第n个轮的长度
        #define TVN_SIZE (1 << TVN_BITS)
        // 掩码:取模或整除用
        #define TVR_MASK (TVR_SIZE - 1)
        #define TVN_MASK (TVN_SIZE - 1)
        // 第1个圈的当前指针位置
        #define FIRST_INDEX(v) ((v) & TVR_MASK)
        // 后面第n个圈的当前指针位置
        #define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n - 1) * TVN_BITS)) & TVN_MASK)
        // 最小刻度单位的指数
        #define TIME_UNIT       50
    
        class TwTimer;
    
        using MilliSec  = std::chrono::milliseconds;
        using TaskRB    = std::function<bool(void)>;
        using TmPoint   = std::chrono::steady_clock::time_point;
        using ListShptr = std::list<std::shared_ptr<TwTimer>>;
        using DealFunc  = std::function<void(future<bool>&&)>;
        
        const MilliSec  unit(TIME_UNIT);       // 定时器槽之间时间间隔,即最小刻度,单位:毫秒
    
    #ifndef USE_BOOST_FUTURE
    	using namespace std;
    #else
    	using namespace boost;
    #endif
        // 任务类,定时任务
        // 只移不可复制类型
        // 复制主要考虑到
        class TwTimer {
        public:
            TwTimer(const MilliSec& msec, TaskRB&& func, bool _loop, 
                    const TmPoint& epoch, std::array<uint8_t, 5>&& cur_slots) : ahp(AsyncThreadPool::GetInstance()),
                                                callback(std::move(func)),
                                                t(std::chrono::steady_clock::now() + msec),
                                                timeout(msec),
                                                loop_flag(_loop),
                                                cur_wheel(0),
                                                start_time(epoch)
            {
                init_slot_seq_cur_wheel(std::move(cur_slots));
            }
            // 任务执行函数
            void execute()
            {
                future_bool = ahp.add(callback);
            }
            future<bool>& get() { return future_bool;}
            // 超时标记点往后挪一个timeout
            void do_loop(std::array<uint8_t, 5>&& cur_slots) 
            { 
                init_slot_seq_cur_wheel(std::move(cur_slots)); 
            }
            // 返回此任务是否是循环任务
            bool is_loop()  const {return loop_flag;}
            // 获取此任务的到期时刻
            std::pair<uint8_t, uint8_t> get_wheel_slot() const
            {
                return std::make_pair(cur_wheel, slot_seq[cur_wheel]);
            }
            // 返回tick后需要跳到的轮子序号和对应轮子的槽号
            std::pair<uint8_t, uint8_t> tick() 
            {
                if (cur_wheel == 0)
                    return std::make_pair(0, slot_seq[0]);
                uint8_t next_wheel = 0;
                for (int i = cur_wheel - 1; i >= 0; i--)
                {
                    if (slot_seq[i] != 0) {
                        next_wheel = i;
                        break;
                    } //如果一个if都没进去,说明cur_wheel在最低环,和初值0一致,并且是整数倍,应该是马上要执行
                }      
                cur_wheel = next_wheel;
                return std::make_pair(next_wheel, slot_seq[next_wheel]);
            }
            uint32_t  get_loop_time() const {return timeout.count();}
            // 只移不可复制类型
            TwTimer(const TwTimer& orgin) = delete;
            TwTimer& operator=(const TwTimer & orgin) = delete;
            TwTimer(TwTimer&&) = default;
            TwTimer& operator=(TwTimer&&) = default;
        private:
            void init_slot_seq_cur_wheel(const std::array<uint8_t, 5>& cur_slots)
            {
                TmPoint end_epoch = timeout + std::chrono::steady_clock::now();
                uint32_t gap = std::chrono::duration_cast<MilliSec>(end_epoch - start_time).count() / TIME_UNIT ;
                slot_seq[0] = FIRST_INDEX(gap);
                for (int i = 1; i < 5 ; i++)
                    slot_seq[i] = NTH_INDEX(gap, i);
                for (int i = 4; i >= 0; i--)
                {
                    if (slot_seq[i] != 0 && cur_slots[i] != slot_seq[i]) {
                        cur_wheel = i;
                        break;
                    } //如果一个if都没进去,说明cur_wheel在最低环,和初值0一致,并且是整数倍,应该是马上要执行
                } 
            }
            AsyncThreadPool&        ahp;        //异步线程池句柄
            TaskRB                  callback;   //超时后的处理函数
            TmPoint                 t;          //任务到期应该执行时候的时刻点
            const MilliSec          timeout;    //超时时长
            const bool              loop_flag;  //是否循环任务
            std::array<uint8_t, 5>  slot_seq;   //记录此任务在5个环中的位置值
            uint8_t                 cur_wheel;  //记录当前处于的环数
            const TmPoint&          start_time; //总时间轮开始时间
            future<bool>            future_bool;
        };
    
        
        class Task_Result
        {
        public:
            explicit Task_Result(std::shared_ptr<TwTimer> _tw) : is_loop(_tw->is_loop())
            {
                if (is_loop)     //如果是循环任务,则保留弱指针,以防循环任务后续被删除的时候,这里还有备份
                    w_tw = _tw;
                else             //如果是非循环任务,则保留shared指针,确保任务被删除后结果仍旧可以保留
                    s_tw = _tw;
            }
    
            bool get_result(uint32_t miiliseconds) const
            { 
                auto span = chrono::milliseconds(miiliseconds);
                std::shared_ptr<TwTimer> sp;
                if (is_loop) {
                    sp = w_tw.lock();
                } else {
                    sp = s_tw;
                }
                if (sp) {
                    future<bool>& ret = sp->get();
                    if (ret.valid())
                    {
                        if (ret.wait_for(span) != future_status::timeout) {
                            try {
                                return ret.get();
                            }
                            catch (std::exception&) {
                                // printf("[exception caught]");
                                return false;
                            }
                        }
                    }
                }
                return false;
            }
    
            // 禁止复制,赋值,移动,移动赋值构造
            // 由于C++11不支持lambda移动捕获(C++14才支持),为了模拟移动捕获,只能开启拷贝构造函数可用
            // 但此类的意图并不是允许拷贝的
    		Task_Result(const Task_Result &) = default;
    		Task_Result& operator=(const Task_Result &) = default;
    		Task_Result(Task_Result&&) = default;
    		Task_Result& operator=(Task_Result&&) = default;
        private:
            std::weak_ptr<TwTimer>      w_tw;
            std::shared_ptr<TwTimer>    s_tw;        
            bool                        is_loop;
        };
        
    
        template<int SLOT>
        class SingleWheel
        {
        public:
            SingleWheel() : cur_slot(0) {}
            // 有新任务加入时的接口
            void add(uint8_t insert_slot_seq, std::shared_ptr<TwTimer>&& sp_tw)
            {
                if (insert_slot_seq >= SLOT)
                    return;
                task_list[insert_slot_seq].emplace_front(std::move(sp_tw));
            }
            // 任务弹出
            // @return first:弹出的任务列表
            //         second:当前轮是否触发下一个轮的tick,即是否进位
            std::pair<ListShptr, bool> tick()
            {
                auto&& list = cascade(cur_slot);
                cur_slot = cur_slot + 1 == SLOT ?  0 : cur_slot + 1;
                return std::make_pair(list, cur_slot == 0);
            }
            std::pair<ListShptr, bool> degrade()
            {
                cur_slot = cur_slot + 1 == SLOT ?  0 : cur_slot + 1;
                auto&& list = cascade(cur_slot);
                return std::make_pair(list, cur_slot == 0);            
            }
    
            // @return 返回此轮当前slot槽号,从0开始
            uint8_t curslot() const { return cur_slot;}
        private:
            // @param remove_slot_seq 需要弹出任务的槽
            // @return 弹出的槽中任务列表
            ListShptr cascade(uint8_t remove_slot_seq)
            {
                ListShptr tmp;
                if (remove_slot_seq >= SLOT)
                    return tmp;
                task_list[remove_slot_seq].swap(tmp);
                return tmp;
            }    
            uint8_t                     cur_slot;
            std::array<ListShptr, SLOT> task_list;
        };
    
        //实现5级时间轮 范围为0~ (2^8 * 2^6 * 2^6 * 2^6 *2^6)=2^32
        class TimeWheel {
        public:
            static TimeWheel& GetInstance()
            {
                static TimeWheel t;
                return t;
            }
    
            // 禁止复制,赋值,移动,移动赋值构造
    		TimeWheel(const TimeWheel &) = delete;
    		TimeWheel& operator=(const TimeWheel &) = delete;
    		TimeWheel(TimeWheel&&) = delete;
    		TimeWheel& operator=(TimeWheel&&) = delete;
    
            void destory() {
                alive = false;
                task_set.clear();
            }
    
            ~TimeWheel() {
                alive = false;
            }
            // 根据定时值创建定时器,并插入槽中
            Task_Result add_loop_timer(const std::string& name, uint32_t milliseconds_timeout, std::function<bool()>&& func)
            {
                return add_timer(name, milliseconds_timeout, std::move(func), true);
            }
    
            Task_Result add_once_timer(const std::string& name, uint32_t milliseconds_timeout, std::function<bool()>&& func)
            {
                return add_timer(name, milliseconds_timeout, std::move(func), false);
            }
            
            void del_timer(const std::string& name)
            {
                std::lock_guard<std::recursive_mutex>  g(mtx);
                task_set.erase(name);
            }
            
        private: 
            TimeWheel() :   ahp(AsyncThreadPool::GetInstance()), 
                            epoch(std::chrono::steady_clock::now()),
                            alive(true),
                            first_tw(new SingleWheel<TVR_SIZE>()),
                            last_tws()
            {
                std::thread th([this](){
                    time_holder holder;
                    uint32_t times = 1;    
                    while (alive) {
                        if (times == 0) {  
                             // uint32次后再重新定位原点,频繁定位起点会造成累计误差
                             // 重定位理论上越少越好,这里不用uint64是因为uint64加法
                             // 32位板子上指令较多,如果是64位cpu,可以采用uint64
                            holder.reset();
                            times = 1;
                        }
                        tick();
                        holder.hold(unit * times++);
                    }
                });
                th.detach();
    
                for (auto& item : last_tws)
                {
                    item = std::unique_ptr<SingleWheel<TVN_SIZE>>(new SingleWheel<TVN_SIZE>());
                }
            }
    
            Task_Result add_timer(const std::string& name, uint32_t milliseconds_timeout, std::function<bool()>&& func, bool is_loop)
            {
                std::lock_guard<std::recursive_mutex>  g(mtx);
                auto tw = std::make_shared<TwTimer>(MilliSec(milliseconds_timeout), std::move(func), is_loop, epoch, cur_slots());
                if (is_loop)
                    task_set.emplace(name, tw);
                Task_Result ret = Task_Result(tw);
                load_timer(std::move(tw));
                return ret;
            }
    
            // 此接口给循环任务使用,循环任务不必进行重新构建任务,只需要修改重新算slot和wheel即可
            void add_timer(std::shared_ptr<TwTimer>&& tw)
            {
                tw->do_loop(cur_slots());
                // printf( "tw:%d, first_tw->curslot():%d, last_tws[i]->curslot():%d\n", tw->get_loop_time(), first_tw->curslot(), last_tws[0]->curslot());
                load_timer(std::move(tw));
            }
            // 此接口为将任务装在入对应时间轮
            void load_timer(std::shared_ptr<TwTimer>&& p_tw)
            {
                // first: wheel数,从0开始,second,slot数,从0开始
                std::pair<uint8_t, uint8_t>&& wheel_slot = p_tw->get_wheel_slot(); 
                if (wheel_slot.first > 4)   //大于4理论上不可能出现,退出以防下面array越界
                    return;
                // printf( "p_tw:%d, wheel_slot.first:%d, wheel_slot.second:%d\n", p_tw->get_loop_time(), wheel_slot.first, wheel_slot.second);
                if (wheel_slot.first == 0)
                    first_tw->add(wheel_slot.second, std::move(p_tw));
                else
                    last_tws[wheel_slot.first - 1]->add(wheel_slot.second, std::move(p_tw));
            }
            std::array<uint8_t, 5> cur_slots() const
            {
                std::array<uint8_t, 5> cur_slot;
                cur_slot[0] = first_tw->curslot();
                for (int i = 0; i < 4; i++)
                {
                    cur_slot[i + 1] = last_tws[i]->curslot();
                }
                return cur_slot;
            }
            void tick()
            {
                std::lock_guard<std::recursive_mutex>  g(mtx);
                auto&& result = first_tw->tick();
                auto& task_list = result.first;
                for (auto& p_tw : task_list)
                {
                    // 这里不会出现判断了unique后,引用次数变化的情况
                    // 外层的add_timer和del_timer都被阻塞住
                    bool is_loop = p_tw->is_loop();
                    if (is_loop && p_tw.unique())  
                        continue;
                    p_tw->execute();
                    if (is_loop)
                        add_timer(std::move(p_tw));
                }
                if (result.second) // 如果低级轮子进位了
                {
                    // printf("jinweile!\n");
                    for (int i = 0; i < 4; i++)
                    {
                        auto&& res = last_tws[i]->degrade();
                        ListShptr& tl = res.first;
                        for (auto & tw : tl)
                        {
                            std::pair<uint8_t, uint8_t> tik = tw->tick();
                            // printf("进位到哪个轮子:%d, 第几个槽:%d", tik.first, tik.second);
                            if (tik.first > 4)   //大于4理论上不可能出现,退出以防下面array越界
                                continue; 
                            if (tik.first == 0)
                                first_tw->add(tik.second, std::move(tw));
                            else
                                last_tws[tik.first - 1]->add(tik.second, std::move(tw));
                        }
                        if (!res.second)
                            break;
                    }
                }
            }
            #if 0
            // 将计时器重新分配轮上的位置
            void realloc(ListShptr&& tl)
            {
                for (auto & tw : tl)
                {
                    std::pair<uint8_t, uint8_t> tik = tw->tick();
                    if (tik.first > 4)   //大于4理论上不可能出现,退出以防下面array越界
                        continue; 
                    if (tik.first == 0)
                        first_tw->add(tik.second, std::move(tw));
                    else
                        last_tws[tik.first - 1]->add(tik.second, std::move(tw));
                }
            }
            #endif
            AsyncThreadPool&                        ahp;        
            const TmPoint                           epoch;      // 类开始时候的时间标记点
            std::atomic_bool                        alive;      // tick线程结束标志位
            std::unique_ptr<SingleWheel<TVR_SIZE>>  first_tw;   // 第一个轮 
            std::array<std::unique_ptr<SingleWheel<TVN_SIZE>>, 4>    last_tws;   // 后四个轮 
            // TwTimer的二次计数,使用TwTimer之前,判断unique,如果为真,表示这里没有了,要删除。
            // 这里不建议适用map,unordered_map查找效率比map高好几个数量级
            std::unordered_map<std::string, std::shared_ptr<TwTimer>>      task_set;
            std::recursive_mutex                               mtx;
        };
    }
    
    asyncthreadpool.hpp 源码
    
    
    /*
     * @Author: Adam Xiao
     * @Date: 2020-12-10 11:34:09
     * @LastEditors: Adam Xiao
     * @LastEditTime: 2020-12-17 19:09:11
     * @FilePath: ./asyncthreadpool.hpp
     * @Description: 时间轮类辅助异步线程池类,由于之前已有的taskqueque不能取消提交的定时任务,将taskqueque拆分,然后重新写
     *               编译时,CXXFLAGS里要加-DASIO_DISABLE_STD_FUTURE -DASIO_DISABLE_STD_EXCEPTION_PTR -DASIO_DISABLE_STD_NESTED_EXCEPTION
     *              
     *              
     */
    #pragma once
    #include <thread>
    #include <chrono>
    #include <memory>
    #include <array>
    #include <list>
    #include <functional>
    
    #ifndef USE_BOOST_FUTURE
    #include <future>
    #else
    //#define BOOST_THREAD_PROVIDES_FUTURE
    #define BOOST_THREAD_VERSION 4
    #include "boost/thread/future.hpp"
    #endif
    
    #ifndef ASIO_STANDALONE
    #define ASIO_STANDALONE
    #define ASIO_NO_DEPRECATED
    #endif
    
    #include "asio/io_context.hpp" 	//io_context use
    #include "asio/post.hpp"
    
    #include "time_holder.hpp"
    
    namespace monitor 
    {
    #ifndef USE_BOOST_FUTURE
    	using namespace std;
    #else
    	using namespace boost;
    #endif
    
        // 不可移动不可复制类型,异步线程池
        class AsyncThreadPool
        {
        public:
            //直接添加任务
            future<bool> add(std::function<bool()> f)
            {
                packaged_task<bool(void)> pt(std::move(f));
                future<bool> fu = pt.get_future();
                asio::post(ioctx, std::move(pt));
                return fu;
            }
             // 只能单例模式生成
            static AsyncThreadPool& GetInstance()
            {
                static AsyncThreadPool tq;
                return tq;
            }
            // 禁止复制,赋值,移动,移动赋值构造
            AsyncThreadPool(const AsyncThreadPool &) = delete;
            AsyncThreadPool& operator=(const AsyncThreadPool &) = delete;
    		AsyncThreadPool(AsyncThreadPool&&) = delete;
    		AsyncThreadPool& operator=(AsyncThreadPool&&) = delete;
        private:
            asio::io_context ioctx;
            asio::executor_work_guard<asio::io_context::executor_type> work;
    
            AsyncThreadPool() : work(ioctx.get_executor())
            {
                int thread_num = 8;
                std::vector<std::thread> processors;
                for (int i = 0; i < thread_num; i++) {
                    processors.emplace_back(std::thread([this]{
                        ioctx.run();
                    }));
                }
                for (auto &th : processors) {
                    th.detach();
                }
            }
        };
    }
    
    展开全文
  • 时间轮算法 简单学习

    2021-03-06 19:18:31
    时间轮为什么要用时间轮实现通常用于实现linux内核任务、游戏类的buf计时。单个时间轮局限性:保存的任务数量少,不能超过当前时间轮。多层时间轮,典型:日-时-分-秒传统java实现定时:Timer,只能单线程,会阻塞;...
  • 线程池

    2021-01-09 20:39:11
    文章目录线程池一,java线程的理解一,线程模型分类:**二,使用线程池的优势:****三,线程池的五种状态:**二,线程池的的源码:**一,线程池的创建方式:****二,参数介绍:****三,线程的执行流程:** ...
  • 3-scheduleThread scheduleThread主要作用就是从数据库中获取最近5s需要执行的任务,进行判断该 任务是否需要立刻执行、下次执行、还是放入时间轮中等待执行。 由于在xxl-job中master是没有中心节点的,所以调度可以...
  • 本篇通过定时任务、时间轮模型等等介绍了Java框架中Dubbo中的时间轮算法应用,希望对JAVA 的学习有所帮助。1 定时任务Netty、Quartz、Kafka 以及 Linux 都有定时任务功能。JDK 自带的 java.util.Timer 和 ...
  • 1 定时任务Netty、Quartz、Kafka 以及 Linux 都有定时任务功能。...在任务量大、性能要求高的场景,为了将任务存取及取消操作时间复杂度降为 O(1),会采用时间轮算法。2 时间轮模型及其应用一种高效批量管理定...
  • 为什么要用时间轮算法来实现延迟操作? 延时操作 Java 不是提供了 Timer 么? 还有 DelayQueue 配合线程池或者 ScheduledThreadPool 不香吗? 我们先来简单看看 Timer、DelayQueue 和 ScheduledThreadPool 的相关实现...
  • 线程池的参数动态调整

    千次阅读 2021-02-12 18:54:13
    先劝退一波 为了不浪费你的时间,先检测一下你是否有阅读本文的基础知识储备: 首先,我们先自定义一个线程池: 拿着这个线程池,当这个线程池在正常工作的前提下,我先问你两个问题: 1.如果这个线程池接受到了 30...
  • 一种简单的观点是:产生的线程越多,每个线程花费在实际工作上的时间就越少。 线程池模式有助于节省多线程应用程序中的资源,并且还可以将并行性包含在某些预定义的限制中。 使用线程池时,您以并行任务的形式编写...
  • 线程池编程实验

    2021-05-04 16:52:16
    线程池编程实验
  • 线程池底层原理

    2021-06-03 09:49:41
    当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取...
  • “池化”思想的设计减少了线程创建与销毁的资源损耗,提高了任务的响应速度,同时还可以提供一些额外的功能,例如定制线程池和阻塞队列都已满时的拒绝策略。 ThreadPoolExecutor做为Java线程池的核心实现类,继承...
  • java/android线程池详解

    2021-03-13 12:26:52
    一,简述线程池线程池是如何工作的:一系列任务出现后,根据自己的线程池安排任务进行。如图:线程池的好处:重用线程池中的线程,避免因为线程的创建和销毁所带来的性能开销。能有效控制线程池的最大并发数,避免...
  • 通过线程池进行轮询

    2021-04-22 21:09:24
    首先应该注入ThreadPoolTaskScheduler,通过这个线程池开启一个线程然后调用这个线程池中的schedule方法,该方法会在规定的时间段中创建线程,通过在这个方法中重写runnable,创建线程,schedule方法其实会返回一个...
  • 1 业务背景 ...由于目前的数据量在9800多万,如果一条一条的进行同步计算,耗费的时间比较长,所以就用异步的方式,同时去处理不同业务线的数据。 采用for循环的方式一次从数据库中查出500条数据进行处理
  • ③ keepAliveTime 顾名思义,其指代线程活动保持时间,即当线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率,不然线程刚执行完一个...
  • 如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动
  • kafka时间轮解析

    2021-05-12 03:35:48
    概述这篇博文的起源在于阿里的公众号里面有一篇文章讲菜鸟的同学在造一个关于时间轮定时器的文章,然后在网上搜索资料发现其实在好多开源的软件里面已经有了,最后选择了kafka里面的定时器实现来加深自己的理解。...
  • 注:本文基于ScheduledThreadPoolExecutor定时线程池类。简介前面我们一起学习了普通任务、未来任务的执行流程,今天我们再来学习一种新的任务——定时任务。定时任务是我们经常会用到的一种任务,它表示在未来某个...
  • 这里写目录标题18.2 线程池18.2.1 理解线程池1.线程池大小2.队列3.任务拒绝策略4.线程工厂5.关于核心线程的特殊配置18.2.2 工厂类Executors18.2.3 线程池的死锁18.2.4 小结参考目录 18.2 线程池    &...
  • springboot整合线程池

    千次阅读 2019-11-15 15:37:34
    ... 版权声明:本文为博主原创文章,遵循 ...版权协议,转载请附上原文出处链接和本声明。... ,0 0,2.5 5,5z" id="raphael-marker-...都并发循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在...
  • 线程池原生任务调度执行策略有缺陷,看我如何优化任务调度。
  • 开始《异步编程:使用线程池管理线程》 示例程序:异步编程:使用线程池管理线程.rar 如今的应用程序越来越复杂,我们常常需要使用《异步编程:线程概述及使用》中提到的多线程技术来提高应用程序的响应速度。这时...
  • Java线程池原理解析

    2021-01-20 15:15:09
    线程的生命周期耗时包括「线程创建时间」,「线程执行任务时间」,「线程销毁时间」,创建和销毁都需要导致系统调用; 2、每个 Thread 都需要有一个内核线程的支持,也就意味着每个 Thread 都需要消耗一定的内核资源...
  • Springboot之多任务并行+线程池处理

    千次阅读 2020-02-18 19:33:00
    点击上方“后端技术精选”,选择“置顶公众号”技术文章第一时间送达!作者:Mi_Chonghttps://blog.csdn.net/qq_31673689最近项目中做到一个关于批量发短信...
  • JAVA线程池原理剖析

    千次阅读 2018-05-02 22:36:09
    一般情况下,我们经常会创建大量的线程为我们完成分配的工作,但是有时候频繁的创建销毁线程也会给系统带来很大的损耗,因此,为了避免带给系统不必要的损耗和重复造轮子,我们引入了线程池的概念。本篇文章主要讲解...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 17,648
精华内容 7,059
关键字:

时间轮线程池