-
2019-05-30 17:33:14
https://blog.csdn.net/qq924862077/article/details/82595948
原作者的目录
照着这个思路进行编写一篇自己的
PS: 若你觉得可以、还行、过得去、甚至不太差的话,可以“关注”一下,就此谢过!
更多相关内容 -
xxl-job 原理
2022-05-03 14:12:59xxl-job 原理 路由策略 分片原理 故障转移xxl-job 原理
一、xxl-job 架构设计
总体分两个部分:- 调度中心:负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统和任务解耦,提高了系统可用性和稳定性。通调度性能不在受限于任务模块。
- 执行器:负责接收调度中的请求并执行任务逻辑。任务模块专注于任务的执行操作,开发和运维更加简单和高校。
设计思想:
调度和任务两个部分相互解耦,全异步化和轻量化,可以提高系统的稳定性和扩展性。
二、xxl-job原理
2.1、执行器注册
执行器启动主要是把自己注册到调度中心然后保存在数据库(xxl_job_registry表),并定时发送心跳,保持续约。执行器正常关闭,也主动告知调度中心注销,这种是主动注册。
如果执行器网络故障,调度中心就不知道执行器的情况,如果把任务路由给一个不可用的执行器,就会导致任务失败。所以调度中心需要不断的对执行器探活(RocketMQ的NameServer 管理broker一样),调度中心会启动一个后台线程定时调用执行器接口,如果发现异常就下线。
2.2、调度中心和任务执行
- JobRegistryMonitorHelper 不停的更新注册表,把超时的执行器剔除(每隔30s执行一次)
- 创建线程池
- 调度器线程ScheduleThread:计算预读取的任务数(默认6000),然后while 循环不停的获取到期的任务
- 时间轮线程池
- 获取任务锁:第一步获取数据库排它锁,如果没有成功说明其他的调度中心在加载任务
- 查询任务:获取锁后, 查询任务
- 调度任务
- 任务触发,选择执行器:按照配置的路由策略,不通路由策略获取方式也不一样
- 远程执行:拿到执行器之后,runExecutor 触发远程的执行器
- 执行器处理远程调用,回调
2.3、 时间轮
一批任务都是不同的时间执行,执行时间精确到秒,如何实现对所有的任务调度这个就是时间轮。
2.4、任务超时
如果任务在指定的时间范围内没有返回结果,就不在等结果,抛出异常。
FutureTask
2.5、失败重试
如果任务执行失败,会更新在xxl_job_log日志表里。调度中心有个后台线程monitorThread。第一步就是查日志表里结果不是200的任务,为了防止集群下同时处理一个失败任务,用了数据库的乐观锁(版本号),如果失败重试次数>0,代表重试,就要重新触发。
调度器启动: JobFailMonitorHelper.getInstance().start();
2.6、故障转移
如果一个执行器挂了,就找另一个执行器执行,直到找到一个正常的执行器。
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover())
2.7、任务数据分片
这里的是数据分片,需要用到分片参数 sharding param,调度器负责把这个分片参数分发给每个执行器(执行器个数和参数个数相等),怎么根据分片参数对数据分片是Job自己的事情。
XxlJobTrigger#trigger
-
微服务架构-分布式解决方案-107:XXL-JOB原理的分析
2021-03-26 22:09:23107:XXL-JOB原理的分析1 xxl-job分布式任务调度平台演示2 传统的定时任务存在哪些缺陷3 集群中的定时任务如何保证执行幂等性4 传统的定时任务的实现方案5 分布式任务调度平台实现原理6 构建XXL-JOBAdmin调度中心7 ...107:XXL-JOB原理的分析
1 xxl-job分布式任务调度平台演示
课题内容
- 传统的定时任务存在哪些缺陷
- 定时任务有哪些实现方案
- 基于XXL-JOB实现分布式任务调度
- XXL-JOB集群方案部署的原理
2 传统的定时任务存在哪些缺陷
传统的定时任务存在哪些缺陷?
1.定时任务代码和业务逻辑代码放入同一个jar中,如果定时任务挂了也会影响到业务逻辑,需要将定时任务和业务逻辑代码完全分开项目部署;
2.如果服务器集群的情况下,可能存在定时任务会重复触发执行;3 集群中的定时任务如何保证执行幂等性
如何保证定时任务在集群中只会执行一个?
- 将定时任务代码单独部署一个jar包中,不参与业务逻辑服务器集群部署;
- 在jar中开启一个定时任务配置开关,判断是否需要将定时任务类加载到spring容器中;
- 使用分布式锁。项目启动中,只要谁能够拿到分布式锁,谁就能够将定时任务的配置类加载到spring容器中,否则不加载;
- 数据库中插入主键id,只要谁能够往数据库中插入一条相同的主键,插入成功就可以加载定时任务配置类;
以上方案只适合小项目,不适合互联网级别项目 - 采用分布式任务调度平台框架
4 传统的定时任务的实现方案
传统定时任务的实现方案
多线程形式、timertask、线程池、springboot注解形式、quartzMaven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.11.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- quartz --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>2.2.1</version> </dependency> </dependencies>
1.基于多线程方式实现
public class ThreadJob { public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { while(true){ try{ Thread.sleep(1000); System.out.println("定时任务每隔1s触发"); }catch (Exception e){ e.printStackTrace(); } } } }).start(); } }
2.TimerTask
public class TimerTaskDemo { public static void main(String[] args) { TimerTask timerTask = new TimerTask() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "定时任务触发"); } }; Timer timer = new Timer(); // 天数 long delay = 0; // 耗秒数 long period = 1000; timer.scheduleAtFixedRate(timerTask, delay, period); } }
3.线程池
public class ScheduledExecutorServiceDemo { public static void main(String[] args) { Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "定时任务触发.."); } }; ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS); } }
4.SpringBoot注解形式
@Component public class UserScheduled { @Scheduled(cron = "0/1 * * * * *") public void taskUserScheduled() { System.out.println("定时任务触发..."); } }
@SpringBootApplication @EnableScheduling public class App { public static void main(String[] args) { SpringApplication.run(App.class); } }
5.基于Quartz实现
public class QuartzTest { public static void main(String[] args) throws SchedulerException { //1.创建Scheduler的工厂 SchedulerFactory sf = new StdSchedulerFactory(); //2.从工厂中获取调度器实例 Scheduler scheduler = sf.getScheduler(); //3.创建JobDetail JobDetail jb = JobBuilder.newJob(MyJob.class) .withDescription("this is a ram job") //job的描述 .withIdentity("ramJob", "ramGroup") //job 的name和group .build(); //任务运行的时间,SimpleSchedle类型触发器有效 long time = System.currentTimeMillis() + 3 * 1000L; //3秒后启动任务 Date statTime = new Date(time); //4.创建Trigger //使用SimpleScheduleBuilder或者CronScheduleBuilder Trigger t = TriggerBuilder.newTrigger() .withDescription("") .withIdentity("ramTrigger", "ramTriggerGroup") //.withSchedule(SimpleScheduleBuilder.simpleSchedule()) .startAt(statTime) //默认当前时间启动 .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) //两秒执行一次 .build(); //5.注册任务和定时器 scheduler.scheduleJob(jb, t); //6.启动 调度器 scheduler.start(); } }
public class MyJob implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("quartz MyJob date:" + new Date().getTime()); } }
5 分布式任务调度平台实现原理
常用式任务调度框架
Xxl-job(推荐)、elasticjob、SpringAlibaba Cloud SchedulerX设计分布式任务调度平台
定时任务的项目一定是集群部署,但是最终只会执行一台服务器;
为了能够提高定时任务集群执行的效率,一定是分片执行;原理分析:
- 手动的将定时任务的项目(执行器)服务器ip和端口号统一存放到分布式任务调度平台的注册中心;
- 所有的定时任务触发规则,先在“分布式任务调度中心”先触发,再查询注册中心执行器集群地址,采用负载均衡的算法只会取一个地址;
- 获取该地址之后,再使用rpc远程调用该接口,通知它可以去触发定时任务。
一句话总结分布式任务调度平台的原理:
1 将定时任务项目(执行器)服务ip和端口号统一注册到分布式任务调度平台中,触发定时任务的时候先走分布式任务调度中心;
2 分布式任务调度中心获取执行器集群列表,采用负载均衡算法获取一个地址,采用rpc通知执行器执行定时任务。
6 构建XXL-JOBAdmin调度中心
XXL-JOB任务调度平台
官网:https://www.xuxueli.com/xxl-job/XXL-Job Admin平台搭建 任务调度中心
1、官方下载XXL-Job Admin的源代码
2、导入xxl-job需要依赖的sql(xxl-job\doc\db\tables_xxl_job.sql)
3、在xxl-job jdbc连接配置加上&serverTimezone=UTC否则报错(xxl-job\xxl-job-admin\src\main\resources\application.properties)
4、启动项目XxlJobAdminApplication和XxlJobExecutorApplication
7 运行官方Demo执行器实现定时任务
-
xxl-job 原理:
2021-08-15 10:05:23quarts的缺点: 随机负载(for update );不能分片 阻塞处理策略: ...分片原理: ...在代码中可以通过工具类 获得 当前 是第几个分片 n 执行任务: ...取数据需要自己从SQL 中处理,保证同一...select * from XXL_JOB_Q...quarts的缺点:
随机负载(for update );不能分片
阻塞处理策略:
分片原理:
for (int i = 0; i < group.getRegistryList().size(); i++) { // 同时给多个客户端发送命令 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); }
在代码中可以通过工具类 获得 当前 是第几个分片 n 执行任务:
取数据需要自己从SQL 中处理,保证同一条数据 不会被不同的执行器取到就可以
在 查询数据的时候,将 n 作为 参数
select * from my_job where mod(id,n) = #{n} 作为当前分片要执行的任务
select * from XXL_JOB_QRTZ_TRIGGER_INFO where mod(sha1(id),3) = 1 ; select * from XXL_JOB_QRTZ_TRIGGER_INFO where mod(id,3) = 1 ;
原理:
执行器 怎么 将 线程的运行日志发送给 调度器? 执行器 会将 执行任务日志放到 自己的队列里,有一个线程会消费这个队列, 并通过 http 请求 传给 调度器,调度器去 更新 日志表。
1.
调度中心(注册中心): 当一个job 可以执行的时候,调度中心 通过 http 请求 将 任务 传给 某个 worker 注册: HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>(); key 是 appName ; value 是: ip 端口 ; while 循环 ------> JobScheduleHelper : select * from xxl_job_lock where lock_name = 'schedule_lock' for update ------> 系统当前时间 大于 jobInfo.getTriggerNextTime() ------> 触发 ------> 更新下一次的触发时间 jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextValidTime.getTime()); UPDATE xxl_job_info SET trigger_last_time = #{triggerLastTime}, trigger_next_time = #{triggerNextTime}, trigger_status = #{triggerStatus} WHERE id = #{id} while 循环 里的伪代码如下: try { Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; // 注册中心(调度中心) 有多台机器,防止多个机器 同时给 一个 执行器发送 http 请求,此处需要加 悲观锁, // 同一时刻只能 有 一个 注册中心,给某一个 worker 发送任务 select * from xxl_job_lock where lock_name = 'schedule_lock' for update // 获取 job 集合 for (XxlJobInfo jobInfo: scheduleList) { // 触发 ThreadPoolExecutor 线程池 中 去执行任务(向客户端发送请求)------> 分片的话,一个时间 选择一台机器发送请求 // 更新 xxl_job_info 中 ,下一次任务的执行时间(long 类型的) } } catch(Exception e){ } finally(){ { // commit if (conn != null) { try { // 释放锁 conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.setAutoCommit(connAutoCommit); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } // close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } } 调度中心: 通过 http 向 客户端发送 请求后(URI 为 /run)
客户端接收到请求后:
客户端在收到 服务端的执行任务指令后如何操作的? ① 客户端引入了 xxl-job-core 包,这个包在 spring bean注册完之后,会有一个回调函数, 将 bean 中 含有 @xxlJob 注解 的Component 和 method 方法 组装成一个 MethodJobHandler 对象 同事以 @xxlJob 上的value 为key ,MethodJobHandler 实例 为value 放到一个 ConcurrentHashMap 中 ② 服务端的执行任务指令过来,会根据 服务端传来的 xxlJob 注解上的value 从 ConcurrentHashMap 获取对应的 Handler 对象 ③ 注册并开启一个工作线程,执行任务,并在 jobHandler 方法里 写入执行结果(不是return success ) ,而是调用官方的方法,通过 InhertableTheadLocal 将执行结果写入到线程私有变量里,并将finally 代码块将执行结果 推到 回调线程里 ④ 回调线程 反馈任务运行结果,服务端接受到结果后根据日志id 更新日志 表
客户端在服务启动的时候,将被xxlJob 标记的method 和相关Component 组成对象,并保存到map里,供调用(有点类似策略模式)
// 伪代码如下: xxl job core 项目 在 bean 注册到容器 里后,有一个回调方法(通过实现 SmartInitializingSingleton 接口实现,重写方法) 回调方法的作用: 获取 容器中 的bean ,主要是 方法被 @XxlJob 注解标记的 bean, // bean spring 中的bean ,executeMethod 被 xxlJob 标注的方法, registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod)); // name 是 Bean 里 加在方法上的 XxlJob 注解 public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); } map 结构如下: private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
反射调用目标方法:
MethodJobHandler extends IJobHandler 中有如下方法: @Override public void execute() throws Exception { Class<?>[] paramTypes = method.getParameterTypes(); if (paramTypes.length > 0) { method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types } else { // 反射调用 被 xxlJob 标注的目标方法 // method 是目标方法,target 是目标方法所在的bean method.invoke(target); } }
③ 注册并开启 工作线程
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); // 开启新线程 newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); // 获取上传一次的 旧线程 JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { // 通过变量 让线程 终止(线程执行完也算是终止) oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; }
工作线程:
public class JobThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(JobThread.class); private int jobId; // 封装了Bean 和目标方法 ,这样反射就可以 调用目标方法 private IJobHandler handler; private LinkedBlockingQueue<TriggerParam> triggerQueue; private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID // 通过一个变量 让一个线程 结束运行,运行完就算线程销毁了(不会旧线程一直 while 循环) private volatile boolean toStop = false; private String stopReason; private boolean running = false; // if running job private int idleTimes = 0; // idel times public JobThread(int jobId, IJobHandler handler) { this.jobId = jobId; this.handler = handler; // 每个线程 都有自己的私有 队列 this.triggerQueue = new LinkedBlockingQueue<TriggerParam>(); this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>()); } public IJobHandler getHandler() { return handler; } /** * new trigger to queue * * @param triggerParam * @return */ public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; } /** * kill job thread * * @param stopReason */ public void toStop(String stopReason) { /** * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; */ this.toStop = true; this.stopReason = stopReason; } /** * is running job * @return */ public boolean isRunningOrHasQueue() { return running || triggerQueue.size()>0; }
重点看下执行任务的 run 方法:
// 没有终止就一直运行 while(!toStop){ try{ // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobContext xxlJobContext = new XxlJobContext( triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); // init job context //底层: InheritableThreadLocal<XxlJobContext> contextHolder = new InheritableThreadLocal<XxlJobContext>() // 线程私有变量的传递 ; 在自定义的 jobHandler 方法里写的日志 要用官方的日志 组件 ,会将 日志内容写入到日志文件里 // eg:他获取文件名称的方法 String logFileName = xxlJobContext.getJobLogFileName(); XxlJobContext.setXxlJobContext(xxlJobContext); // execute XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam()); // 有过期时间用 FutureTask (这个可以设置超时时间) if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() { @Override public Boolean call() throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); // 反射调用 目标方法 handler.execute(); return true; } }); futureThread = new Thread(futureTask); futureThread.start(); Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobHelper.log("<br>----------- xxl-job job execute timeout"); XxlJobHelper.log(e); // handle result XxlJobHelper.handleTimeout("job execute timeout "); } finally { futureThread.interrupt(); } } else { // 没有过期时间 // just execute // 反射调用 目标方法 handler.execute(); } }catch{ }finally{ if(triggerParam != null) { // callback handler info if (!toStop) { // 往回调线程里 push 消息,通知执行完结果 TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), // 这个是在哪里设置的值? 在 自己写的任务 xxJobHandler 方法里(底层 调用 InheritableThreadLocal 获取当前线程的私有属性,然后设置成功吗) // 代码如下: // if (exitValue == 0) { // default success //} else { // XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); //} XxlJobContext.getXxlJobContext().getHandleCode(), XxlJobContext.getXxlJobContext().getHandleMsg() ) ); } else { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_COCE_FAIL, stopReason + " [job running, killed]" ) ); } } } }
④ 回调线程 通过http 请求 反馈工作 线程 执行结果
说明:没有执行完任务后直接回调,而是 放到一个队列里,批量 反馈,减少了网络开销
主要属性和方法:
LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>() // 添加执行结果 public static void pushCallBack(HandleCallbackParam callback){ getInstance().callBackQueue.add(callback); logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); }
@Override public void run() { // normal callback while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { // 通过 http 进行回调 doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback try { List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); } }); 继续进入: @Override public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class); }
1. 注册中心收到 请求后的操作:
api/callback
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { // 线程池 callbackThreadPool.execute(new Runnable() { @Override public void run() { for (HandleCallbackParam handleCallbackParam: callbackParamList) { ReturnT<String> callbackResult = callback(handleCallbackParam); logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", (callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult); } } }); return ReturnT.SUCCESS; }
callBack 方法如下:
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { // valid log item XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); if (log == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found."); } if (log.getHandleCode() > 0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc } // handle msg StringBuffer handleMsg = new StringBuffer(); if (log.getHandleMsg()!=null) { handleMsg.append(log.getHandleMsg()).append("<br>"); } if (handleCallbackParam.getHandleMsg() != null) { handleMsg.append(handleCallbackParam.getHandleMsg()); } // success, update log ;(说明:insert log 是 在服务端向客户端发送请求的时候插入的) log.setHandleTime(new Date()); log.setHandleCode(handleCallbackParam.getHandleCode()); log.setHandleMsg(handleMsg.toString()); XxlJobCompleter.updateHandleInfoAndFinish(log); return ReturnT.SUCCESS; }
调度日志的显示:
点击 页面 执行日志,执行了两部操作:
首先从 调度器获取 日志id(日志文件的文件名 日志id.log ),日志所在执行器服务器信息 ② 通过页面 ajax 请求 访问执行器 日志文件(配置文件里配置的路径)
代码如下:
① 页面 获取日志id,执行器服务器信息 如下:
说明:
文件名称 和 日志id 有关:(每次执行完一个任务,就生成一个文件,而不是 所有日志文件堆积在一起 )
log filename: logPath/yyyy-MM-dd/9999.log
② 读取日志文件(说明:如果 调度中心和 执行器不在一台服务器上,得修改源码,将本地读取日志文件改为 远程读取日志文件)
@RequestMapping("/logDetailCat") @ResponseBody public ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, int logId, int fromLineNum){ try { ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(executorAddress); // 读取日志文件的内容 ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum); // is end if (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) { XxlJobLog jobLog = xxlJobLogDao.load(logId); if (jobLog.getHandleCode() > 0) { logResult.getContent().setEnd(true); } } return logResult; } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<LogResult>(ReturnT.FAIL_CODE, e.getMessage()); } }
读取方法:
@Override public ReturnT<LogResult> log(long logDateTim, int logId, int fromLineNum) { //获取日志名称,说明 9999 是 日志id // log filename: logPath/yy yy-MM-dd/9999.log String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId); // 读取日志 LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum); return new ReturnT<LogResult>(logResult); }
如何获取日志文件名称?
public static String makeLogFileName(Date triggerDate, int logId) { // filePath/yyyy-MM-dd SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // avoid concurrent problem, can not be static File logFilePath = new File(getLogPath(), sdf.format(triggerDate)); if (!logFilePath.exists()) { logFilePath.mkdir(); } // filePath/yyyy-MM-dd/9999.log String logFileName = logFilePath.getPath() .concat(File.separator) // 拼接 logId 为日志名称 .concat(String.valueOf(logId)) // 日志后缀 .concat(".log"); return logFileName; }
读取日志的方法:
public static LogResult readLog(String logFileName, int fromLineNum){ // valid log file if (logFileName==null || logFileName.trim().length()==0) { return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true); } // 创建文件对象 File logFile = new File(logFileName); if (!logFile.exists()) { return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true); } // read file StringBuffer logContentBuffer = new StringBuffer(); int toLineNum = 0; LineNumberReader reader = null; try { //reader = new LineNumberReader(new FileReader(logFile)); reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8")); String line = null; while ((line = reader.readLine())!=null) { toLineNum = reader.getLineNumber(); // [from, to], start as 1 if (toLineNum >= fromLineNum) { // 读完一行就换行 logContentBuffer.append(line).append("\n"); } } } catch (IOException e) { logger.error(e.getMessage(), e); } finally { if (reader != null) { try { reader.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } // 返回结果集 LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false); return logResult; }
-
xxl-job原理及介绍
2021-02-18 17:07:002.10 任务依赖 原理:XXL-JOB中每个任务都对应有一个任务ID,同时,每个任务支持设置属性“子任务ID”,因此,通过“任务ID”可以匹配任务依赖关系。 当父任务执行结束并且执行成功时,将会根据“子任务ID”匹配子... -
XXL-JOB原理
2022-03-18 11:18:23XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。在众多XXL-Job平台的特征中,有如下几条需要关注的: 1、使用简单:... -
分布式调度平台XXL-JOB原理分析
2022-02-22 16:45:57分布式调度:Quartz,XXL-Job,Elastic-Job,TBSchedule,DolphinScheduler,…… 调度框架对比:https://www.cnblogs.com/davidwang456/p/9057839.html,https://www.cnblogs.com/ssslinppp/p/12485273.html 1.2... -
3. xxl-job原理-- 执行器注册
2019-05-30 17:36:092. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原... -
XXL-JOB原理--任务执行时间轮(六)
2021-01-13 00:15:13之前 XXL-JOB 任务执行是通过 Quartz来进行任务管理触发的,在之前的博客 《Quartz任务调度框架–任务执行流程》 我们进行了任务执行的流程介绍,目前 XXL-JOB 任务执行已经摒弃 Quartz 框架,目前通过时间轮方式来... -
任务调度系统之XXL-JOB分享.pptx
2020-07-29 17:26:25这个ppt是自己调研任务调度框架,最后选型xxl-job框架编写的。主要是对比了目前主流的几个调度系统框架,各自的优缺点等...最后主要介绍了xxl-job的功能特性、框架原理和注意事项。该ppt总计30页,适合学习和技术分享。 -
XXL-JOB原理--定时任务框架简介(一)
2018-09-10 21:14:01之前在工作中有接触许雪里大神开源的分布式任务调度平台XXL-JOB,最近...XXL-JOB原理--任务调度中心任务管理(四) XXL-JOB原理--任务执行(五) 一、完整介绍地址:官方介绍 二、最新版本架构图: 三、介绍... -
8. xxl-job 原理-- 任务执行或触发
2019-05-30 17:37:172. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原... -
xxl-job集群原理
2021-11-19 15:13:41一、xxl-job集群需要满足的条件 调度中心支持集群部署,提升调度系统容灾和可用性; 调度中心集群部署时,几点要求和建议: 1)DB配置保持一致; 2)集群机器时钟保持一致(单机集群忽视); 官方建议:推荐通过... -
XXL-Job原理分析
2017-10-19 16:38:54最近调研调度工具时, 看到了 Github 上的XXL-JOB,已在美团内部、以及其他公司有生产应用。 特点 整体情况: XXL-JOB是基于开源 Quartz 调度内核的、为方便企业调度场景而开源的一款实用的调度工具。自带任务配置... -
9. xxl-job原理-- jobthread的作用
2019-05-30 17:37:342. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原... -
10. xxl-job原理---回调
2019-05-30 17:38:552. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job原理-- 调度中心注册 7. xxl-job原理-- 任务管理 8. xxl-job原理--... -
xxl-job原理合集
2022-02-15 10:36:142.xxl-job(v2.1.0 Release)执行器端的执行器自动注册原理 3.xxl-job(v2.1.0 Release)调度器端的执行器自动注册原理 4.xxl-job(v2.1.0 Release)任务管理... -
Xxl-Job执行器原理解析
2021-05-31 22:49:20xxl-job版本:2.3.0 Xxl-Job分为执行器、调度器。... 执行器初始化过程步骤如下 ...1通过加了@Conguration注解的XxlJobConfig初始化,并生成beanName=...-扫描所有bean,加载加了@XxlJob注解类,并记录在job... -
xxl-job的使用及简述原理
2020-12-14 21:21:38文章目录前言1. 介绍2. 部署篇2.1. 初始化数据库2.2. 部署调度中心2.2.1 集群部署2.3. 部署执行器2.3.1 集群部署3. 使用篇3.1. 设置执行器3.2....本文章基于xxl-job 2.2.0, jdk8 , springboot 2.2.6.RELEASE -
XXL-JOB原理--执行器注册(二)
2018-10-17 14:46:011、xxl-job添加执行器到任务调度中心有两种方式 (1)客户端执行器自动将名称和机器地址注册到任务调度中心 (2)可以在任务调度中心手动录入执行器名称和相关的机器地址(多个机器地址用逗号隔开) 2、自动... -
4. xxl-job原理-- 执行器注册问题
2019-05-30 17:36:242. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原... -
xxl-job的实现原理(源码分析)
2021-11-26 17:15:07关于使用xxl-job的流程,这里不再赘述。在加入xxl-job依赖后,需要我们自己做的就是创建一个XxlJobSpringExecutor对象交给spring容器管理,因为XxlJobSpringExecutor实现了SmartInitializingSingleton接口,在spring... -
分布式任务调度平台XXL-JOB.pdf
2018-05-06 21:29:58分布式任务调度平台XXL-JOB.pdf,pdf文档来源于官网xxl-job说明 -
5 xxl-job原理-- 执行器注册问题
2019-05-30 17:36:372. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原... -
分布式任务调度平台XXL-Job原理(8_29)
2020-12-23 20:06:26XXL-JOB任务调度平台 官网 https://www.xuxueli.com/xxl-job/ 中文文档 https://www.xuxueli.com/xxl-job/ 源码仓库地址 https://github.com/xuxueli/xxl-job http://gitee.com/xuxueli0323/xxl-job 下载地址 ... -
1. xxl-job原理---定时任务架构
2019-05-30 17:35:092. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原... -
Xxl-Job调度器原理解析
2021-06-01 22:48:10xxl-job版本:2.3.0 Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。 调度器初始化过程步骤如下 1 国际化相关 配置参数... -
XXL-JOB原理--任务调度中心任务管理(四)
2018-09-15 17:10:10在任务调度中心可以进行新建任务,新建任务之后可以在任务列表中查看相关任务,任务可以根据我们配置的cron表达式进行任务调度,或者... 我们了解到xxl-job是基于quartz来实现定时任务的(其实任务调度中心任务执行... -
XXL-JOB原理--任务调度中心执行器注册(三)
2018-09-15 14:33:16在上一篇博客XXL-JOB学习--执行器注册(一)中我们介绍了xxl-job执行器注册到任务调度中心的流程及相关注册信息,接下来我们看看任务调度中心接受任务注册后做了哪些事情。...1、xxl-job admin通过JobApiCo...