-
2021-01-13 00:15:13
1、介绍
之前 XXL-JOB 任务执行是通过 Quartz来进行任务管理触发的,在之前的博客 《Quartz任务调度框架–任务执行流程》 我们进行了任务执行的流程介绍,目前 XXL-JOB 任务执行已经摒弃 Quartz 框架,目前通过时间轮方式来管理任务触发任务。
2、任务执行
XXL-JOB 任务执行中启动了两个线程:
(1)线程 scheduleThread 运行中不断的从任务表中查询 查询近 5000 毫秒(5秒)中要执行的任务,如果当前时间大于任务接下来要执行的时间则立即执行,否则将任务执行时间除以 1000 变为秒之后再与 60 求余添加到时间轮中。
(2)XXL-JOB 时间轮实现方式比较简单,就是一个 Map 结构数据,key值0-60,value是任务ID列表
Map<Integer, List> ringData
(3)线程 ringThread 运行中不断根据当前时间求余从 时间轮 ringData 中获取任务列表,取出任务之后执行任务。public class JobScheduleHelper { private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class); private static JobScheduleHelper instance = new JobScheduleHelper(); public static JobScheduleHelper getInstance(){ return instance; } // 任务间隔大小 public static final long PRE_READ_MS = 5000; // pre read private Thread scheduleThread; private Thread ringThread; private volatile boolean scheduleThreadToStop = false; private volatile boolean ringThreadToStop = false; // 时间轮,key 0-60,value 任务ID列表,两个线程同时处理这个对象 private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>(); public void start(){ // schedule thread scheduleThread = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) // 每次读取到任务个数 int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; while (!scheduleThreadToStop) { // Scan Job long start = System.currentTimeMillis(); Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); // 分布式下获取锁 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // tx start // 1、pre read long nowTime = System.currentTimeMillis(); // 查询 当前时间 + 5000 毫秒,就是接下来 5 秒要执行到任务 List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump // 如果当前时间大于要接下来执行到时间 + 5 秒 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 1、misfire match // 如果每次都有执行则立即触发执行 MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); } // 2、fresh next // 刷新接下来要执行时间 refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger // 如果当前时间大于接下来要执行到时间则立即触发执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next // 刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second // 如果接下来 5 秒内还执行则直接放到时间轮中 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next // 刷新下次执行时间 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second // 任务还没有到执行时间则直接放到时间轮中 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next // 刷新下次执行时间 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { // 更新任务信息 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { // commit if (conn != null) { try { conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.setAutoCommit(connAutoCommit); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } // close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } long cost = System.currentTimeMillis()-start; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); } }); scheduleThread.setDaemon(true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start(); // ring thread ringThread = new Thread(new Runnable() { @Override public void run() { // align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } while (!ringThreadToStop) { try { // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; // 获取最近1秒和 2秒要执行到任务 for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger // 执行任务 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } // next second, align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); } private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { Date nextValidTime = generateNextValidTime(jobInfo, fromTime); if (nextValidTime != null) { jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextValidTime.getTime()); } else { jobInfo.setTriggerStatus(0); jobInfo.setTriggerLastTime(0); jobInfo.setTriggerNextTime(0); logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}", jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf()); } } private void pushTimeRing(int ringSecond, int jobId){ // push async ring List<Integer> ringItemData = ringData.get(ringSecond); if (ringItemData == null) { ringItemData = new ArrayList<Integer>(); ringData.put(ringSecond, ringItemData); } ringItemData.add(jobId); logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); } public void toStop(){ // 1、stop schedule scheduleThreadToStop = true; try { TimeUnit.SECONDS.sleep(1); // wait } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (scheduleThread.getState() != Thread.State.TERMINATED){ // interrupt and wait scheduleThread.interrupt(); try { scheduleThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // if has ring data boolean hasRingData = false; if (!ringData.isEmpty()) { for (int second : ringData.keySet()) { List<Integer> tmpData = ringData.get(second); if (tmpData!=null && tmpData.size()>0) { hasRingData = true; break; } } } if (hasRingData) { try { TimeUnit.SECONDS.sleep(8); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // stop ring (wait job-in-memory stop) ringThreadToStop = true; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (ringThread.getState() != Thread.State.TERMINATED){ // interrupt and wait ringThread.interrupt(); try { ringThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop"); } // ---------------------- tools ---------------------- public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null); if (ScheduleTypeEnum.CRON == scheduleTypeEnum) { Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime); return nextValidTime; } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) { return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 ); } return null; } }
通过以上分析,我们发现 XXL-JOB 通过一个简单到时间轮就可以完成一个任务到倒计时执行操作。
更多相关内容 -
恒压供水与轮巡工作机制,S7200PLC程序
2018-05-30 12:14:13两用一备水泵供水控制,最大保证有一台备用,保证一台设备处于变频状态,系统达到50HZ后自动切换到工频工作,系统频率小于30HZ时自动减去一台工频泵,下次增加泵时起用最长时间未使用的水泵。水泵采用轮流工作机制。... -
【不一样的面经】阿里面试,我挂在了第四轮……
2019-05-27 18:56:35阿里社招一般有四到五轮,我这次的流程是第一轮技术面、第二轮写代码、第三轮boss面、第四轮boss面、第五轮HR面。然而我没能和HR聊上一句。 当时我在上海法院出差,第一轮是在周一晚上8点钟左右,我一看是...可能每个技术人都有个阿里梦,我也不例外。最近准备跳槽,前一阵子在准备各种面试,也面了几个大厂,包括阿里。是的,最后我挂在了第四轮。这篇文章来总结一下我在阿里四轮的面试经历,希望能对读者有所启发。
阿里社招一般有四到五轮,我这次的流程是第一轮技术面、第二轮写代码、第三轮boss面、第四轮boss面、第五轮HR面。然而我没能和HR聊上一句。
当时我在上海法院出差,第一轮是在周一晚上8点钟左右,我一看是杭州打过来的,就知道是他们了。和面试官聊了有将近两个小时,整体感觉还不错。
第一轮的面试官,如果你能进,基本上都是你后面的同事,和面试官聊天的过程,给我的感受就是一个字:舒服。我之前面过某站,完全是两种不同的体验。第一轮主要都是围绕着技术来展开,一般把自己的项目经验总结好,把自己掌握的技术栈复习好,基本上问题不大。
技术面大概进行了1个半小时,主要涉及到的有并发问题、消息中间件问题、JVM、项目的技术细节讨论等等。还有半小时,完全是闲聊了,因为我之前在华为做过5G,面试官对这个比较感兴趣,就问了一些关于5G未来的发展趋势,所以整体聊下来,感觉OK,第一轮问题不大,中规中矩。
随后几天,就继续在上海法院现场解决问题,周三的时候,还是那个面试官,跟我约了周六来做几道题,周六我已经出差回去了,中午面试官给我打电话,并且发了个链接,我在这边写代码可以实时同步到他那边。
有三道题,第一题和并发有关,第二题是一道算法题,第三题是一个设计模式,但是要支持可扩展。题目看起来也算中规中矩,面试官要求三选二,我选择了第一个和第三个,顺利做完了。(如果对题目细节感兴趣的读者,可以在文章下面给我留言)
过了2天,还是这个面试官,跟我约了后面会有两轮boss面,说句实话,我心里还是蛮虚的,为什么呢?因为我当前一直在做政府项目,对互联网电商这块的经验比较欠缺。这是短板,也必须得克服。
周四的时候,第三轮开始了,打过来的是一个女boss,一开始问了我一些问题让我觉得很像是HR哈,比如你为什么要离职?比如你觉得当前的项目对你成长最大的地方在哪?比如你觉得自己的优势和缺点是什么?这些问题一般都是HR喜欢问的。
所以我中途来了一句,您应该是HR吧?哈哈,她说她是做技术的,我里个神,感觉自己被打脸。随后就是一连串的技术问题了,这个女boss貌似是想证明她是搞技术的。问的问题都是有点偏解决方案类的,比如给你一个场景,如果是你,你该如何解决?主要涉及的有分布式、缓存、消息队列等内容,答的还可以。
这期间有个小插曲,也是我这次面试最记忆深刻的,面试官问我那个优缺点时,我对自己缺点的回答是:目前对互联网电商这块的技术落地或者相关解决方案还仅限于理论层面,也需要一个这样的平台。然后这位boss冒出了一句很经典的话:来阿里你不缺挑战,可能你更多需要关注第一年你能否活得下来。这句话很经典,瞬间我对阿里人又多了层敬佩。
当然,我的回答就比较喜剧了:哈哈哈,扶我起来,我还可以再写几行代码!
面试官:呵呵呵…… 所以我猜她应该是80后。
第三轮整体感觉还OK,问题基本上也回答的还可以,过了两天,这位boss跟我约第四轮,因为到了五一放假了,所以第四轮的boss面要再多等一个周,所以整个面试下来,横跨了一个月,其实战线蛮长的,确实有点累。
第四轮是部门技术leader,约的是视频面试,这一轮明显感觉气场更加强大了,比第三轮的boss更加难以hold住。问的问题主要围绕两大块,第一是根据我的项目来提出漏洞,让我解决;第二是他自己设定场景,让我给出解决方案。
第一种问题其实还好,毕竟是拿现有的项目作为背景,回答的还可以,但是有个比较尴尬的是,面试官听完了我的项目介绍之后来了句:感觉你们的项目没啥技术含量啊!场面极度尴尬……
第二种问题其实蛮被动的,因为电商这块的经验不是很足,虽然我之前看了一些东西,但感觉回答的一般,果不其然,最后挂在了这一轮。最后我看到的邮件评语是:您的职业经历与该职位的要求略有差异等等。
这次内推的这个岗位是经济体解决方案,后来跟内推的阿里大佬沟通是,他们这个岗位计划是招P7的,所以要求高是正常的,虽然我把这些当作是安慰自己的话,但是我还是当真了,哈哈!
以上给大家复盘了我这次整个阿里的一个面试经历,整体来说,对我本人的成长蛮大的,通过我的文字,大家也可以感受到每一轮的面试,压力是递增的,当然如果技术水平和项目经验都OK,那也是没有问题的。
最后再给大家简单总结一下:
-
社招面试,技术问的相对来说更加深入,所以对有些源码还是要了解点,比如并发相关的原理,是经常被问到的。JVM就更不用说了,几乎是必问到的。
-
所有的问题都是围绕具体场景,一般大厂都会结合具体场景来问你问题,所以你会demo是肯定不行的,你还要知道为什么要用这个技术,以及如何做到高可用等等。
-
以后面试,基本上都会让你写代码的,招软件开发工程师,光会嘴论是不行的,这就要平时抽空多练习了,我指的是练练算法题,而不是那种业务代码。
关注我的公众号【武哥聊编程】,回复关键词:BAT,即可领取一波非常优质的学习资源,专为进军BAT量身打造。
- 这次非常感谢阿里雷哥,内推的过程给了我很多的帮助,还有其他几位阿里的大佬,包括我师弟,也跟他们咨询了很多问题,人要有感恩的心,不管结果如何,下次去杭州和北京,一顿饭是少不了的!
说真的,其实挂在第四面,心里还是蛮可惜的,不过还好,在阿里第三轮的时候,拼多多第一轮也开始了,后面拿到了拼多多的offer,过几天我继续跟大家分享一下拼多多的面试经历。
欢迎大家关注我的微信公众号【武哥聊编程】,回复 “笔记” 可以领取我自己写的5万字SpringBoot学习笔记。大家共同进步。
-
-
时间轮算法
2021-01-25 19:21:04 其实这两者是可以互相转换的,比如现在有一个定时任务是12点执行,当前时间是9点,那就可以认为这个任务是3小时后执行。同样,现在又有一个任务,是3小时后执行,那也可以认为这个任务12点执行。 假设我们...时间轮算法
最近工作中使用了Xxl-Job框架来做分布式调度,内部采用了时间轮做整体调度,顺便学习并总结一下。
绝对时间和相对时间
定时任务一般有两种:
1. 约定一段时间后执行。 2. 约定某个时间点执行。
其实这两者是可以互相转换的,比如现在有一个定时任务是12点执行,当前时间是9点,那就可以认为这个任务是3小时后执行。同样,现在又有一个任务,是3小时后执行,那也可以认为这个任务12点执行。
假设我们现在有3个定时任务A、B、C,分别需要在3点、4点和9点执行,我们把它们都转换成绝对时间。
只需要把任务放到它需要被执行的时刻,然后等到时针转到相应的位置时,取出该时刻放置的任务,执行就可以了。这就是时间轮算法的核心思想。重复执行
多数定时任务是需要重复执行,比如每天上午9点执行生成报表的任务。对于重复执行的任务,其实我们需要关心的只是下次执行时间,并不关心这个任务需要循环多少次,还是那每天上午9点的这个任务来说。
- 比如现在是下午4点钟,我把这个任务加入到时间轮,并设定当时针转到明天上午九点(该任务下次执行的时间)时执行。
- 时间来到了第二天上午九点,时间轮也转到了9点钟的位置,发现该位置有一个生成报表的任务,拿出来执行。
- 同时时间轮发现这是一个循环执行的任务,于是把该任务重新放回到9点钟的位置。
- 重复步骤2和步骤3。
如果哪一天这个任务不需要再执行了,那么直接通知时间轮,找到这个任务的位置删除掉就可以了。由上面的过程我们可以看到,时间轮至少需要提供4个功能:
- 加入任务
- 执行任务
- 删除任务
- 沿着时间刻度前进
时间轮的数据结构
时钟可以使用数组来表示,那么时钟的每一个刻度就是一个槽,槽用来存在该刻度需要被执行的定时任务。正常业务中,同一时刻中是会存在多个定时任务的,所以每个槽中放一个链表或者队列就可以了,执行的时候遍历一遍即可。
时间刻度不够用
增加时间轮的刻度
现在有我有2个定时任务,一个任务每周一上午9点执行,另一个任务每周三上午9点执行。最简单的办法就是增大时间轮的长度,可以从12个加到168 (一天24小时,一周就是168小时),那么下周一上午九点就是时间轮的第9个刻度,这周三上午九点就是时间轮的第57个刻度。
这样做的缺点:
- 时间刻度太多会导致时间轮走到的多数刻度没有任务执行,比如一个月就2个任务,我得移动720次,其中718次是无用功。
2. 时间刻度太多会导致存储空间变大,利用率变低,比如一个月就2个任务,我得需要大小是720的数组,如果我的执行时间的粒度精确到秒,那就更恐怖了。
任务增加round属性
现在时间轮的刻度还沿用24,但是槽中的每个任务增加一个round属性,代表时钟转过第几圈之后再次转到这个槽的时候执行。
上图代表任务三在指针下一圈移动时执行,整体流程就是时间轮没移动一个刻度的时候都要遍历槽中所有任务,对每个任务的round属性减1,并取出round为0的任务调度,这样可以解决增加时间轮带来的空间浪费。但是这样带来的问题时,每次移动刻度的耗时会增加,当时间刻度很小(秒级甚至毫秒级),任务列表有很长,这种方案是不能接受的。
分层时间轮
分层时间轮是这样一种思想:
1. 针对时间复杂度的问题:不做遍历计算round,凡是任务列表中的都应该是应该被执行的,直接全部取出来执行。
2. 针对空间复杂度的问题:分层,每个时间粒度对应一个时间轮,多个时间轮之间进行级联协作。假设现在有3个定时任务:
1. 任务一每天上午9点执行 2. 任务二每周2上午9点执行 3. 任务三每月12号上午9点执行。
根据这三个任务的调度粒度,可以划分为3个时间轮,月轮、周轮和天轮,初始添加任务时,任务一被添加到天轮上,任务二被添加到周轮,任务三被添加到月轮上。三个时间轮按各自的刻度运转,当周轮移动到刻度2时,取出任务二丢到天轮上,当天轮移动到刻度9时执行。同样任务三在移动到刻度12时,取出任务三丢给月轮。以此类推。
-
Kafka的TimingWheel(时间轮)算法
2022-01-18 11:08:10Kafka中存在大量的延迟操作,比如延迟生产、延迟拉取和延迟删除等,Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮算法自定义实现了一个用于延迟功能的定时器(SystemTimer)。...Kafka中存在大量的延迟操作,比如延迟生产、延迟拉取和延迟删除等,Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮算法自定义实现了一个用于延迟功能的定时器(SystemTimer)。
JDK中的Timer和DelayQueue单个任务的插入和删除的平均时间复杂度为O(logN),并不能满足Kafka的高性能要求,而基于时间轮可以将任务的插入和删除操作的时间复杂度降为O(1)。
下图即为Kafka的时间轮结构:
Kafka的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务链表(TimerTaskList),或者称之为任务槽。TimerTaskList是一个环形的双向链表,链表中的每一项表示的均是定时任务(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
时间轮由多个时间格组成, 每个时间格代表当前时间轮的基本时间跨度(tickMs) 。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式tickMs × wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。
下面我们通过Kafka的源代码来具体讲解一下时间轮算法。
任务添加
def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { // Cancelled false } else if (expiration < currentTime + tickMs) { // Already expired false } else if (expiration < currentTime + interval) { // Put in its own bucket val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) // Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } }
以222任务为例,讲解一下任务添加到时间轮的过程:
SystemTimer的addTimerTaskEntry方法调用的是TimeingWheel的add方法,若任务添加失败,则证明当前任务已到期,直接将该任务交给工作线程来执行;
TimeingWheel的add方法首先获取任务的过期时间expiration,这里为222;下面走到判断逻辑:
- 若expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行;
假设SystemTimer的创建时间为0,则SystemTimer创建的TimeingWheel的currentTime也为0,由于222 > 0+1,所以不符合第1个判断,进入第2个判断。
- 若expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽;
由于222 > 0+10,所以不符合第2个判断,进入第3个判断。
- 若expiration >= currentTime + interval,证明该层次的时间轮不可以容纳该任务,需要往上尝试上一层时间轮;
获取到上一层时间轮后,直接在上一层时间轮上继续执行add方法。
第2层时间轮的tick=10,interval=100,由于222 > 0+100,所以还是进入到第3个判断,继续获取上一层时间轮。
第3层时间轮的tick=100,interval=1000,由于222 < 0+1000,所以进入到第2个判断,执行任务的添加过程。
下面接着看任务的添加过程:
(1) 首先是计算槽位;
val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) virtualId = 222 / 100 = 2 bucket = 2 % 10 = 2 即第2个槽位,对应[200-300]的范围
(2) 获取到该槽位上的任务链表,并将任务添加到链表里;
bucket.add(timerTaskEntry)
(3) 若该链表是首次添加任务,则需要设置链表的过期时间expiration,并将该链表添加到SystemTimer的DelayQueue中。
// Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { queue.offer(bucket) } 过期时间为2*100=200
可以看一下timerTaskList.setExpiration方法:
def setExpiration(expirationMs: Long): Boolean = { expiration.getAndSet(expirationMs) != expirationMs }
可以发现,链表的过期时间若与之前设置的相同,则直接返回False,避免重复将链表添加到Timer的DelayQueue中。
时间轮推动
接下来,我们看一下如何推动时间轮,假设我们创建了1个SystemTimer,并添加了过期时间为9、88、222、520、521、522等6个定时任务,分别编号①到⑥。
任务添加后的时间轮示意图如下:
SystemTimer构造器如下:
@threadsafe class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer { // timeout timer private[this] val taskExecutor = Executors.newFixedThreadPool(1, (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable)) private[this] val delayQueue = new DelayQueue[TimerTaskList]() private[this] val taskCounter = new AtomicInteger(0) private[this] val timingWheel = new TimingWheel( tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue )
SystemTimer是依靠DelayQueue来进行时间轮推进的。
def advanceClock(timeoutMs: Long): Boolean = { // 获取DelayQueue首元素(最快过期的任务槽) var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { writeLock.lock() try { // 若任务槽不为null,则不断循环(while保证了时间轮的不断推进) while (bucket != null) { // 级联更新各层级的时间轮currentTime为时间槽的过期时间 timingWheel.advanceClock(bucket.getExpiration) // 删除该槽,并将时间槽中的任务重新添加到时间轮 bucket.flush(addTimerTaskEntry) // 继续获取DelayQueue首元素(最快过期的任务槽) bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } }
- 任务1所在槽为DelayQueue首元素,其过期时间为9,然后,各级时间轮的currentTime更新为9;
- 对任务1所在槽中的各元素执行flush操作;
// Remove all task entries and apply the supplied function to each of them def flush(f: TimerTaskEntry => Unit): Unit = { synchronized { var head = root.next while (head ne root) { // 删除槽中的各元素 remove(head) // 执行传入的function f(head) head = root.next } // 将原有时间槽的过期时间设置为-1 expiration.set(-1L) } }
flush传入的函数为SystemTimer的addTimerTaskEntry:
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { // 尝试往时间轮中添加任务TimerTaskEntry if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled // 若添加失败,则证明该任务被取消或者已经过期 if (!timerTaskEntry.cancelled) // 过期任务,直接提交给工作线程执行 taskExecutor.submit(timerTaskEntry.timerTask) } }
任务1重新添加到时间轮,此时:
currentTime=9 expiration=9 tick=1 interval=10 expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行
-
继续执行delayQueue.poll()方法,此时返回任务②所在的槽,其过期时间为80,然后,各级时间轮的currentTime更新为80;
-
任务②重新添加到时间轮,此时:
currentTime=80 expiration=88 tick=1 interval=10 expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽; virtualId = expiration / tickMs = 88 / 1 = 88 bucket = virtualId % wheelSize = 88 % 10 = 8 即第8个槽位,将槽位的过期时间设置为88,并添加到延迟队列delayQueue中
-
继续执行delayQueue.poll()方法,此时返回任务②所在的槽,其过期时间为88,然后,各级时间轮的currentTime更新为88;
-
任务②重新添加到时间轮,此时:
currentTime=88 expiration=88 tick=1 interval=10 expiration < currentTime + tick,证明当前任务已到期,则直接返回fasle,将该任务交给工作线程来执行
其他任务以此类推。
重点理解2点:
(1) currentTime是如何演进的;
(2) 任务是如何从时间大轮向小轮降级的。任务槽
SystemTimer是依靠DelayQueue来进行时间轮推进的,而DelayQueue中的元素则为时间轮中的槽TimerTaskList。
添加到延迟队列的元素必须实现Delayed接口的getDelay和compareTo方法:
def getDelay(unit: TimeUnit): Long = { unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[TimerTaskList] java.lang.Long.compare(getExpiration, other.getExpiration) }
当任务槽的delay<=0时,该任务槽会被从延迟队列中poll出来,然后遍历槽中的元素,依次执行重新添加到时间轮的操作;
将时间槽中的任务重新添加到时间轮时,会发生任务槽降级或者任务直接提交给工作线程执行。
每次重新添加槽,均是从最小的时间轮开始尝试的:
比如任务③,其初始槽位在第3层时间轮的第2个槽位,当其被取出重新添加到时间轮时,首先从第1层时间轮尝试:
currentTime=200 expiration=222 tick=1 interval=10 expiration >= currentTime + interval,证明该层次的时间轮不可以容纳该任务,需要往上尝试上一层时间轮
接着尝试第2层时间轮:
currentTime=200 expiration=222 tick=10 interval=100 expiration < currentTime + interval,证明当前层次的时间轮就可以容纳该任务,将任务放入该时间轮的对应槽; virtualId = 222 / 10 = 22 bucket = 22 % 10 = 2 即第2个槽位,并设置该槽位的过期时间为virtualId * tickMs = 22*10 = 220
可以发现,任务③从第3层时间轮的第2个时间槽(过期时间为200)降级到第2层时间轮的第2个时间槽(过期时间为220)。
依次推导,下次降级会从第2层时间轮的第2个时间槽(过期时间为220)降级到第1层时间轮的第2个时间槽(过期时间为222)。
接着再次降级,此时,已没有更低精度的时间轮了,expiration < currentTime + tick,表明当前任务已到期,将该任务交给工作线程来执行。
综上,随着时间轮的不断推进,任务会被反复重新添加到时间轮,其槽位会不断降级,且过期时间的精度也会逐步提高,直至精度到达最小时间轮的精度,表明任务真正到期,提交执行。
分析到这里可以发现,Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue 的队头只需要O(1)的时间复杂度(获取之后DelayQueue内部才会再次切换出新的队头)。如果采用每毫秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓“知人善用”,用TimjngWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,两者相辅相成。
参考文献:
[1] https://github.com/apache/kafka/tree/trunk/core/src/main/scala/kafka/utils/timer
[2] 《深入理解Kafka:核心设计与实践原理》,作者:朱忠华,出版社:电子工业出版社 -
xxl-job 执行器时间轮
2021-01-29 17:43:19什么是时间轮 时间轮出自Netty中的HashedWheelTimer,是一个环形结构,可以用时钟来类比,钟面上有很多bucket,每一个bucket上可以存放多个任务,使用一个List保存该时刻到期的所有任务,同时一个指针随着时间流逝一... -
1、时间轮
2020-07-25 14:15:44时间轮上每个格子储存了一个双向链表,用于记录定时任务,当指针转到对应的格子的时候,会检查对应的任务 是否到期,如果到期就会执行链条上的任务。 二、为什么使用时间轮? 我认为这个世界上任何事物的出现都有它... -
智能对话机器人之多轮对话工作机制 | Chatopera
2021-08-19 19:26:45目录https://bot.chatopera.comChatopera 多轮对话工作机制多轮对话的定义什么场景下使用多轮对话Chatopera 多轮对话知识库意图识别脚本对话多轮对话的检索模块间检索创建脚本对话的话题话题检索顺序对话状态机擦除... -
任务调度:时间轮算法经典案例解析及应用实现
2020-09-18 16:20:00当任务量大的时候,频繁的入堆出堆性能有待考虑 单线程执行,如果一个任务执行的时间过久则会影响下一个任务的执行时间(当然你任务的run要是异步执行也行) 从代码中可以看到对异常没有做什么处理,那么一个任务出错... -
阿里面试,我挂在了第四轮……
2019-05-31 12:00:00可能每个技术人都有个阿里梦,我也不例外。最近准备跳槽,前一阵子在准备各种面试,也面了几个大厂,包括阿里。是的,最后我挂在了第四轮。这篇文章来总结一下我在阿里四轮的面试经历... -
扫地机器人速解左/右轮不转、清洁差故障维修!
2020-12-23 15:27:48扫地机器人的出现,解决了懒人的忧愁,不用在为打卫生而烦恼,可以好好还享受休息时间,但这次出现了点小故障,就是扫地机器人左/右轮不转了,不知如何检修。想找扫地机器人故障维修方法,就要从现象开始入手,这样... -
你是怎么变自律的?
2019-11-29 07:30:59很多小伙伴咨询我, ...别人的时间表,真的就适合你?你真的需要这样吗? 先写结论吧:自律是假相,你要能找到自己的驱动器,而不是自律本身,有些自律,其实是没有任何意义的。记得点赞呀。 目录... -
基于stm32四轮小车简易PID控制
2021-03-31 20:19:23例如生活中,一个加热器需要对某个物体进行恒温控制,但是由于某种原因导致温度过高或者过低,这时候传感器会返回相应的数据,告诉控制器应当作出相应的调整,降温或者是加热,这就完成了一个简单的PID的闭环控制。... -
ODrive实例 #2 电机配置实例(小米9号平衡车轮毂电机)
2020-12-21 13:23:30True,表示编码器已校准下次重新启动后可以直接使用本次校准的结果 4.2 保存校准结果 odrv0.save_configuration() odrv0.reboot() 将自动检测的电机参数保存并重启 等待 ODrive 重新连接到 odrivetool 5. 上电后控制... -
那些惊艳的算法们(三)—— 时间轮
2019-01-10 11:26:56还需执行数据分析的任务等等,于是你刚才可能就比较好奇的时间轮的数据结构到现在可能更加好奇了,那我们先来说说时间轮的数据结构吧。 ### 时间轮的数据结构 首先,时钟可以用数组或者循环链表表示,这个每个时钟的... -
Netty之时间轮
2019-02-16 00:53:40假设一个格子是1秒,则整个wheel能表示的时间段为8s,假如当前指针指向2,此时需要调度一个3s后执行的任务,显然应该加入到(2+3=5)的方格中,指针再走3次就可以执行了;如果任务要在10s后执行,应该等指针走完一个... -
爱奇艺面试Python,竟然挂在第5轮……
2019-05-23 06:42:18今天给大家分享我曾经在爱奇艺的面试,过程还是比较有意思的,可以给大家一些参考 ...我:(我去,真的好酥啊)啊听到了,嗯,最近有在看 (不看机会,就没机会聊下去了啊) 嗲妹妹:嗯嗯,那简单的介绍一下... -
「面试」到阿里第五轮后我才知道所谓的URL是什么
2020-10-04 10:05:19就是经常使用的以“Http://”开头的那一串东东,其实常用的还有很多,比如 “FTP” , "FILE"等,我们所访问的目标网站不同,网址开头的写法也就不同,下面列出常见的几种URL。 从上图可知,URL 中可以包含服务器的... -
代码轮子
2019-03-06 14:36:14PPRows for Mac- 在Mac上优雅的计算你写了多少行代码。open-source-ios-apps- iOS开源App集合,分:swift与Objective-C--国外人整理。NewsBlur作者独自一个人 Samuel Clay 做出来的一款名为 NewsBlur 的新闻阅读器, ... -
基于STM32的二轮自平衡小车
2021-03-23 21:44:33二轮自平衡机器人正是在这一背景下提出来的,对于制作此种类型的自平衡小车无疑对我stm32的学习有莫大的好处,一方面深入了解stm32以及学习串级PID的运用,另一方面也是对我近期学习的硬件方面知识一个检验。... -
Spring前一次定时任务没执行完,下次任务要求再次执行
2021-11-10 13:43:54第一步在springboot启动类上添加@EnableAsync注解启动异步任务 第二步在实际的定时任务方法上添加@Async()注解 这样的话当当前任务正在执行时,即还未执行完毕时,当下次1分钟的时候也会再次扫描该定时任务。... -
akshare改写公募基金轮动策略
2022-03-14 20:49:49下次加上日期这些数据,做成df格式,然后用pyfolio进行查看。 导包: import akshare as ak import pandas as pd import numpy as np import matplotlib 换成周线(也可以换成月线,年线): #日线换为周线... -
简单时间轮算法
2018-02-18 23:31:00概念分析简单时间轮算法是时间轮算法的入门内容。笔者暂时研究到这块,下面做下详细的分享。时间轮算法:是指有一条时间...如果这个指针一直转动,那么每节中的每个任务就可以按照恒定的时间周期执行。实际当中,这... -
时间轮(Timing Wheel)算法-高性能定时器策略笔记
2019-05-17 20:58:28但是这个前提是所有输入条件都拿到手的情况下,如果有些输入条件 A 你并不知道什么时候能符合,那怎么办?写一个whlie循环一直检查?这样无疑很浪费CPU,显然行不通。 有经验程序员可以已经想到办法,把这... -
3 万字 + 100 张图带你彻底搞懂 TCP 面试题(强烈建议收藏)
2021-04-19 12:22:38过去不会没关系,今天就让我们来消除这份恐惧,微笑着勇敢的面对它吧! 所以小林整理了关于 TCP 的面试题型,全文共 3 万字 + 100 张图,跟大家一起探讨探讨。 1、 TCP 基本认识 2、TCP 连接建立 3、 TCP 连接断开... -
时间轮分析
2022-01-25 20:01:22在HeaderExchangeClient启动的时候调用startHeartBeatTask()开启心跳任务,创建了一个HeartbeatTimerTask任务然后扔到了HashedWheelTimer时间轮类里面去,我们看见是先构造了一个时间轮对象,我们进去看看是啥 ... -
面试官:你背了几道面试题就敢说熟悉Java源码?对不起,我们不招连源码都不会看的人
2020-03-30 18:47:49你看源码么? 你会看源码么? 你从源码中有收获么? -
你不好奇 CPU 是如何执行任务的吗?
2020-11-06 17:43:44你清楚下面这几个问题吗? 有了内存,为什么还需要 CPU Cache? CPU 是怎么读写数据的? 如何让 CPU 能读取数据更快一些? CPU 伪共享是如何发生的?又该如何避免? CPU 是如何调度任务的?如果你的任务对响应要求... -
【定时任务】时间轮算法
2020-04-08 12:25:43还需执行数据分析的任务等等,于是你刚才可能就比较好奇的时间轮的数据结构到现在可能更加好奇了,那我们先来说说时间轮的数据结构吧。 6 时间轮的数据结构 首先,时钟可以用数组或者循环链表表示,这个每个时钟的... -
Webots学习笔记—四轮小车的模型搭建和简单控制
2020-06-29 01:32:26至此,我们第一个项目到此就圆满完成啦! 写在最后 这次我们学习了四轮小车的模型搭建和简单的控制,下次我将给大家介绍距离传感器并且实现小车的避障。 由于我也是刚刚入门Webots的小白,可能有一些地方写的不太...