精华内容
下载资源
问答
  • xxl-job原理
    千次阅读
    2019-05-30 17:33:14
    更多相关内容
  • xxl-job 原理

    2022-05-03 14:12:59
    xxl-job 原理 路由策略 分片原理 故障转移

    xxl-job 原理

    一、xxl-job 架构设计

    在这里插入图片描述
    总体分两个部分:

    • 调度中心:负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统和任务解耦,提高了系统可用性和稳定性。通调度性能不在受限于任务模块。
    • 执行器:负责接收调度中的请求并执行任务逻辑。任务模块专注于任务的执行操作,开发和运维更加简单和高校。

    设计思想:

    调度和任务两个部分相互解耦,全异步化和轻量化,可以提高系统的稳定性和扩展性。

    二、xxl-job原理

    2.1、执行器注册

    执行器启动主要是把自己注册到调度中心然后保存在数据库(xxl_job_registry表),并定时发送心跳,保持续约。执行器正常关闭,也主动告知调度中心注销,这种是主动注册。

    如果执行器网络故障,调度中心就不知道执行器的情况,如果把任务路由给一个不可用的执行器,就会导致任务失败。所以调度中心需要不断的对执行器探活(RocketMQ的NameServer 管理broker一样),调度中心会启动一个后台线程定时调用执行器接口,如果发现异常就下线。

    2.2、调度中心和任务执行

    1. JobRegistryMonitorHelper 不停的更新注册表,把超时的执行器剔除(每隔30s执行一次)
    2. 创建线程池
      1. 调度器线程ScheduleThread:计算预读取的任务数(默认6000),然后while 循环不停的获取到期的任务
      2. 时间轮线程池
    3. 获取任务锁:第一步获取数据库排它锁,如果没有成功说明其他的调度中心在加载任务
    4. 查询任务:获取锁后, 查询任务
    5. 调度任务
    6. 任务触发,选择执行器:按照配置的路由策略,不通路由策略获取方式也不一样
    7. 远程执行:拿到执行器之后,runExecutor 触发远程的执行器
    8. 执行器处理远程调用,回调

    2.3、 时间轮

    一批任务都是不同的时间执行,执行时间精确到秒,如何实现对所有的任务调度这个就是时间轮

    2.4、任务超时

    如果任务在指定的时间范围内没有返回结果,就不在等结果,抛出异常。

    FutureTask
    

    2.5、失败重试

    如果任务执行失败,会更新在xxl_job_log日志表里。调度中心有个后台线程monitorThread。第一步就是查日志表里结果不是200的任务,为了防止集群下同时处理一个失败任务,用了数据库的乐观锁(版本号),如果失败重试次数>0,代表重试,就要重新触发。

    调度器启动:
    JobFailMonitorHelper.getInstance().start();
    

    2.6、故障转移

    如果一个执行器挂了,就找另一个执行器执行,直到找到一个正常的执行器。

    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover())
    

    2.7、任务数据分片

    这里的是数据分片,需要用到分片参数 sharding param,调度器负责把这个分片参数分发给每个执行器(执行器个数和参数个数相等),怎么根据分片参数对数据分片是Job自己的事情。

    XxlJobTrigger#trigger
    
    展开全文
  • 107:XXL-JOB原理的分析1 xxl-job分布式任务调度平台演示2 传统的定时任务存在哪些缺陷3 集群中的定时任务如何保证执行幂等性4 传统的定时任务的实现方案5 分布式任务调度平台实现原理6 构建XXL-JOBAdmin调度中心7 ...

    1 xxl-job分布式任务调度平台演示

    课题内容

    1. 传统的定时任务存在哪些缺陷
    2. 定时任务有哪些实现方案
    3. 基于XXL-JOB实现分布式任务调度
    4. XXL-JOB集群方案部署的原理

    2 传统的定时任务存在哪些缺陷

    传统的定时任务存在哪些缺陷?
    1.定时任务代码和业务逻辑代码放入同一个jar中,如果定时任务挂了也会影响到业务逻辑,需要将定时任务和业务逻辑代码完全分开项目部署;
    2.如果服务器集群的情况下,可能存在定时任务会重复触发执行;

    3 集群中的定时任务如何保证执行幂等性

    如何保证定时任务在集群中只会执行一个?

    1. 将定时任务代码单独部署一个jar包中,不参与业务逻辑服务器集群部署;
    2. 在jar中开启一个定时任务配置开关,判断是否需要将定时任务类加载到spring容器中;
    3. 使用分布式锁。项目启动中,只要谁能够拿到分布式锁,谁就能够将定时任务的配置类加载到spring容器中,否则不加载;
    4. 数据库中插入主键id,只要谁能够往数据库中插入一条相同的主键,插入成功就可以加载定时任务配置类;
      以上方案只适合小项目,不适合互联网级别项目
    5. 采用分布式任务调度平台框架

    4 传统的定时任务的实现方案

    传统定时任务的实现方案
    多线程形式、timertask、线程池、springboot注解形式、quartz

    Maven依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.11.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>
    

    1.基于多线程方式实现

    public class ThreadJob {
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while(true){
                        try{
                            Thread.sleep(1000);
                            System.out.println("定时任务每隔1s触发");
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }
    

    2.TimerTask

    public class TimerTaskDemo {
        public static void main(String[] args) {
            TimerTask timerTask = new TimerTask() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "定时任务触发");
                }
            };
            Timer timer = new Timer();
            // 天数
            long delay = 0;
            // 耗秒数
            long period = 1000;
            timer.scheduleAtFixedRate(timerTask, delay, period);
        }
    }
    

    3.线程池

    public class ScheduledExecutorServiceDemo {
        public static void main(String[] args) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "定时任务触发..");
                }
            };
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
            scheduledExecutorService.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
        }
    }
    

    4.SpringBoot注解形式

    @Component
    public class UserScheduled {
        @Scheduled(cron = "0/1 * * * * *")
        public void taskUserScheduled() {
            System.out.println("定时任务触发...");
        }
    }
    
    @SpringBootApplication
    @EnableScheduling
    public class App {
        public static void main(String[] args) {
            SpringApplication.run(App.class);
        }
    }
    

    5.基于Quartz实现

    public class QuartzTest {
        public static void main(String[] args) throws SchedulerException {
            //1.创建Scheduler的工厂
            SchedulerFactory sf = new StdSchedulerFactory();
            //2.从工厂中获取调度器实例
            Scheduler scheduler = sf.getScheduler();
    
    
            //3.创建JobDetail
            JobDetail jb = JobBuilder.newJob(MyJob.class)
                    .withDescription("this is a ram job") //job的描述
                    .withIdentity("ramJob", "ramGroup") //job 的name和group
                    .build();
    
            //任务运行的时间,SimpleSchedle类型触发器有效
            long time = System.currentTimeMillis() + 3 * 1000L; //3秒后启动任务
            Date statTime = new Date(time);
    
            //4.创建Trigger
            //使用SimpleScheduleBuilder或者CronScheduleBuilder
            Trigger t = TriggerBuilder.newTrigger()
                    .withDescription("")
                    .withIdentity("ramTrigger", "ramTriggerGroup")
                    //.withSchedule(SimpleScheduleBuilder.simpleSchedule())
                    .startAt(statTime)  //默认当前时间启动
                    .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) //两秒执行一次
                    .build();
    
            //5.注册任务和定时器
            scheduler.scheduleJob(jb, t);
    
            //6.启动 调度器
            scheduler.start();
        }
    }
    
    public class MyJob implements Job {
        public void execute(JobExecutionContext context) throws JobExecutionException {
           System.out.println("quartz MyJob date:" + new Date().getTime());
       }
    }
    

    5 分布式任务调度平台实现原理

    常用式任务调度框架
    Xxl-job(推荐)、elasticjob、SpringAlibaba Cloud SchedulerX

    设计分布式任务调度平台
    定时任务的项目一定是集群部署,但是最终只会执行一台服务器;
    为了能够提高定时任务集群执行的效率,一定是分片执行;

    原理分析:

    1. 手动的将定时任务的项目(执行器)服务器ip和端口号统一存放到分布式任务调度平台的注册中心;
    2. 所有的定时任务触发规则,先在“分布式任务调度中心”先触发,再查询注册中心执行器集群地址,采用负载均衡的算法只会取一个地址;
    3. 获取该地址之后,再使用rpc远程调用该接口,通知它可以去触发定时任务。
      在这里插入图片描述
      一句话总结分布式任务调度平台的原理:
      1 将定时任务项目(执行器)服务ip和端口号统一注册到分布式任务调度平台中,触发定时任务的时候先走分布式任务调度中心;
      2 分布式任务调度中心获取执行器集群列表,采用负载均衡算法获取一个地址,采用rpc通知执行器执行定时任务。

    6 构建XXL-JOBAdmin调度中心

    XXL-JOB任务调度平台
    官网:https://www.xuxueli.com/xxl-job/

    XXL-Job Admin平台搭建 任务调度中心
    1、官方下载XXL-Job Admin的源代码
    2、导入xxl-job需要依赖的sql(xxl-job\doc\db\tables_xxl_job.sql)
    3、在xxl-job jdbc连接配置加上&serverTimezone=UTC否则报错(xxl-job\xxl-job-admin\src\main\resources\application.properties)
    4、启动项目XxlJobAdminApplication和XxlJobExecutorApplication
    在这里插入图片描述

    7 运行官方Demo执行器实现定时任务

    在这里插入图片描述

    展开全文
  • xxl-job 原理

    2021-08-15 10:05:23
    quarts的缺点: 随机负载(for update );不能分片 阻塞处理策略: ...分片原理: ...在代码中可以通过工具类 获得 当前 是第几个分片 n 执行任务: ...取数据需要自己从SQL 中处理,保证同一...select * from XXL_JOB_Q...

     quarts的缺点:

    随机负载(for  update );不能分片

    阻塞处理策略: 

    分片原理:

     
    
    for (int i = 0; i < group.getRegistryList().size(); i++) {
                    // 同时给多个客户端发送命令
                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
                }

    在代码中可以通过工具类 获得 当前 是第几个分片 n  执行任务:

    取数据需要自己从SQL 中处理,保证同一条数据 不会被不同的执行器取到就可以

    在 查询数据的时候,将  n 作为 参数 

    select * from my_job  where  mod(id,n) = #{n}  作为当前分片要执行的任务

    select  * from   XXL_JOB_QRTZ_TRIGGER_INFO  where mod(sha1(id),3) = 1  ;
    
    select  * from   XXL_JOB_QRTZ_TRIGGER_INFO  where mod(id,3) = 1  ;

    原理:

     执行器 怎么 将  线程的运行日志发送给 调度器? 执行器 会将 执行任务日志放到 自己的队列里,有一个线程会消费这个队列,
        并通过 http 请求 传给 调度器,调度器去 更新 日志表。

    1.

     调度中心(注册中心): 当一个job 可以执行的时候,调度中心 通过 http 请求 将 任务 传给 某个 worker
    
    
    
      注册:
      
      HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
      
      key  是 appName ;
      value  是: ip 端口 ;
      
      
      
      
      while 循环  
                 ------> JobScheduleHelper : select * from xxl_job_lock where lock_name = 'schedule_lock' for update
    			 
    			 ------> 系统当前时间 大于  jobInfo.getTriggerNextTime()
    			          ------>  触发
                          ------>   更新下一次的触发时间
                                      jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                      jobInfo.setTriggerNextTime(nextValidTime.getTime());	
                                          UPDATE xxl_job_info
    		                                      SET
    												trigger_last_time = #{triggerLastTime},
    												trigger_next_time = #{triggerNextTime},
    												trigger_status = #{triggerStatus}
    		                                      WHERE id = #{id}	
         
    
    
    
    
        while 循环 里的伪代码如下:
    
             try  {
    		 
    		 Connection conn = null;
                        Boolean connAutoCommit = null;
                        PreparedStatement preparedStatement = null;
    		 
    		 // 注册中心(调度中心) 有多台机器,防止多个机器 同时给 一个 执行器发送 http 请求,此处需要加 悲观锁,
    		 // 同一时刻只能 有 一个 注册中心,给某一个  worker 发送任务
    		 select * from xxl_job_lock where lock_name = 'schedule_lock' for update 
    		 
    		 // 获取 job 集合
    		 
    		  for (XxlJobInfo jobInfo: scheduleList) {
    		       // 触发   ThreadPoolExecutor 线程池 中 去执行任务(向客户端发送请求)------>   分片的话,一个时间 选择一台机器发送请求
    			   
    			   // 更新 xxl_job_info 中 ,下一次任务的执行时间(long 类型的)
    		  
    		  }
    		 
    		
    		 
    		 
    		 } catch(Exception e){
    		 
    		 } finally(){
    		    {
    
                            // commit
                            if (conn != null) {
                                try {
    							// 释放锁
                                    conn.commit();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.setAutoCommit(connAutoCommit);
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
    
                            // close PreparedStatement
                            if (null != preparedStatement) {
                                try {
                                    preparedStatement.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
                        }
    		 }										  
      
      
      
      
      
       调度中心:  通过 http 向 客户端发送 请求后(URI 为  /run)

    客户端接收到请求后:

    
      客户端在收到 服务端的执行任务指令后如何操作的?
      
        ① 客户端引入了  xxl-job-core 包,这个包在 spring bean注册完之后,会有一个回调函数,
    	   将 bean 中 含有 @xxlJob 注解  的Component 和 method 方法 组装成一个 MethodJobHandler 对象
    	   同事以 @xxlJob 上的value 为key ,MethodJobHandler 实例 为value 放到一个 ConcurrentHashMap 中
    	   
    	② 服务端的执行任务指令过来,会根据 服务端传来的 xxlJob 注解上的value 从 ConcurrentHashMap 获取对应的 Handler 对象
    	
    	③ 注册并开启一个工作线程,执行任务,并在 jobHandler 方法里 写入执行结果(不是return success ) ,而是调用官方的方法,通过 InhertableTheadLocal 
          将执行结果写入到线程私有变量里,并将finally 代码块将执行结果 推到  回调线程里	
    	
    	④  回调线程 反馈任务运行结果,服务端接受到结果后根据日志id 更新日志 表

    客户端在服务启动的时候,将被xxlJob 标记的method 和相关Component 组成对象,并保存到map里,供调用(有点类似策略模式)

    // 伪代码如下:
      xxl job  core  项目 在 bean 注册到容器 里后,有一个回调方法(通过实现 SmartInitializingSingleton 接口实现,重写方法)
       回调方法的作用:
         获取 容器中 的bean ,主要是  方法被   @XxlJob 注解标记的  bean, 
    
    
    
     // bean  spring 中的bean ,executeMethod 被 xxlJob 标注的方法,
                    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
    
      // name  是  Bean 里 加在方法上的 XxlJob 注解
      public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
            logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
            return jobHandlerRepository.put(name, jobHandler);
        }
    	
      map 结构如下:
         private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

    反射调用目标方法:

    MethodJobHandler extends IJobHandler 中有如下方法:
       
        @Override
        public void execute() throws Exception {
            Class<?>[] paramTypes = method.getParameterTypes();
            if (paramTypes.length > 0) {
                method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
            } else {
    		    // 反射调用  被 xxlJob 标注的目标方法
    			// method 是目标方法,target 是目标方法所在的bean
                method.invoke(target);
            }
        }

     ③ 注册并开启 工作线程 

     public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
            JobThread newJobThread = new JobThread(jobId, handler);
    		 // 开启新线程 
            newJobThread.start();
            logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
             
    		 // 获取上传一次的 旧线程
            JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
            if (oldJobThread != null) {
    		    // 通过变量 让线程 终止(线程执行完也算是终止)
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
            }
    
            return newJobThread;
        }
    

    工作线程:

    public class JobThread extends Thread{
    	private static Logger logger = LoggerFactory.getLogger(JobThread.class);
    
    	private int jobId;
    	 // 封装了Bean 和目标方法 ,这样反射就可以 调用目标方法
    	private IJobHandler handler;  
    	private LinkedBlockingQueue<TriggerParam> triggerQueue;
    	private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID
    
        // 通过一个变量 让一个线程 结束运行,运行完就算线程销毁了(不会旧线程一直 while 循环)
    	private volatile boolean toStop = false;
    	private String stopReason;
    
        private boolean running = false;    // if running job
    	private int idleTimes = 0;			// idel times
    
    
    	public JobThread(int jobId, IJobHandler handler) {
    		this.jobId = jobId;
    		this.handler = handler;
    		// 每个线程 都有自己的私有 队列
    		this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
    		this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
    	}
    	public IJobHandler getHandler() {
    		return handler;
    	}
    
        /**
         * new trigger to queue
         *
         * @param triggerParam
         * @return
         */
    	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
    		// avoid repeat
    		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
    			logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
    			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
    		}
    
    		triggerLogIdSet.add(triggerParam.getLogId());
    		triggerQueue.add(triggerParam);
            return ReturnT.SUCCESS;
    	}
    
        /**
         * kill job thread
         *
         * @param stopReason
         */
    	public void toStop(String stopReason) {
    		/**
    		 * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
    		 * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
    		 * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
    		 */
    		this.toStop = true;
    		this.stopReason = stopReason;
    	}
    
        /**
         * is running job
         * @return
         */
        public boolean isRunningOrHasQueue() {
            return running || triggerQueue.size()>0;
        }

    重点看下执行任务的 run 方法:

    // 没有终止就一直运行
    	 while(!toStop){
    	 
    	 
    	   try{
    	        
    			
    			      // log filename, like "logPath/yyyy-MM-dd/9999.log"
    					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
    					XxlJobContext xxlJobContext = new XxlJobContext(
    							triggerParam.getJobId(),
    							triggerParam.getExecutorParams(),
    							logFileName,
    							triggerParam.getBroadcastIndex(),
    							triggerParam.getBroadcastTotal());
    
    					// init job context
    					//底层:  InheritableThreadLocal<XxlJobContext> contextHolder = new InheritableThreadLocal<XxlJobContext>()
    					// 线程私有变量的传递  ; 在自定义的  jobHandler 方法里写的日志 要用官方的日志 组件  ,会将 日志内容写入到日志文件里
    					
    					// eg:他获取文件名称的方法  String logFileName = xxlJobContext.getJobLogFileName();
    					XxlJobContext.setXxlJobContext(xxlJobContext);
    
    					// execute
    					XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
    
    	   
    	   
    	         // 有过期时间用 FutureTask  (这个可以设置超时时间)
    	      if (triggerParam.getExecutorTimeout() > 0) {
    						// limit timeout
    						Thread futureThread = null;
    						try {
    							FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
    								@Override
    								public Boolean call() throws Exception {
    
    									// init job context
    									XxlJobContext.setXxlJobContext(xxlJobContext);
                                         // 反射调用 目标方法
    									handler.execute();
    									return true;
    								}
    							});
    							futureThread = new Thread(futureTask);
    							futureThread.start();
    
    							Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
    						} catch (TimeoutException e) {
    
    							XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
    							XxlJobHelper.log(e);
    
    							// handle result
    							XxlJobHelper.handleTimeout("job execute timeout ");
    						} finally {
    							futureThread.interrupt();
    						}
    					} else {
    					     // 没有过期时间
    						// just execute
    						//  反射调用 目标方法
    						handler.execute();
    					}
    	   
    	   }catch{
    	   
    	   
    	   
    	   
    	   }finally{
    	   
    	          if(triggerParam != null) {
                        // callback handler info
                        if (!toStop) {
                            //  往回调线程里 push 消息,通知执行完结果
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            		triggerParam.getLogId(),
    								triggerParam.getLogDateTime(),
    								 // 这个是在哪里设置的值? 在  自己写的任务 xxJobHandler   方法里(底层 调用  InheritableThreadLocal 获取当前线程的私有属性,然后设置成功吗)
    								 // 代码如下:
    								 //  if (exitValue == 0) {
                                        // default success
    									//} else {
    									//	XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");
    									//}
    								XxlJobContext.getXxlJobContext().getHandleCode(),
    								XxlJobContext.getXxlJobContext().getHandleMsg() )
    						);
                        } else {
                            // is killed
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            		triggerParam.getLogId(),
    								triggerParam.getLogDateTime(),
    								XxlJobContext.HANDLE_COCE_FAIL,
    								stopReason + " [job running, killed]" )
    						);
                        }
                    }
    
    	 }
    	 
    	 
    	 }

     ④ 回调线程 通过http 请求 反馈工作 线程 执行结果

    说明:没有执行完任务后直接回调,而是  放到一个队列里,批量 反馈,减少了网络开销

    主要属性和方法:

     LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>()
    	 
    	 // 添加执行结果
    	 public static void pushCallBack(HandleCallbackParam callback){
            getInstance().callBackQueue.add(callback);
            logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
        }
     @Override
                public void run() {
    
                    // normal callback
                    while(!toStop){
                        try {
                            HandleCallbackParam callback = getInstance().callBackQueue.take();
                            if (callback != null) {
    
                                // callback list param
                                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                                callbackParamList.add(callback);
    
                                // callback, will retry if error
                                if (callbackParamList!=null && callbackParamList.size()>0) {
    							    // 通过 http 进行回调
                                    doCallback(callbackParamList);
                                }
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
    
                    // last callback
                    try {
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
    
                }
            });
    		
    		
    		继续进入:
    		
    		@Override
         public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
            return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
        }

    1.  注册中心收到 请求后的操作:

    api/callback 

    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
            // 线程池
    		callbackThreadPool.execute(new Runnable() {
    			@Override
    			public void run() {
    				for (HandleCallbackParam handleCallbackParam: callbackParamList) {
    					ReturnT<String> callbackResult = callback(handleCallbackParam);
    					logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
    							(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
    				}
    			}
    		});
    
    		return ReturnT.SUCCESS;
    	}

    callBack 方法如下:

    private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
    		// valid log item
    		XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
    		if (log == null) {
    			return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
    		}
    		if (log.getHandleCode() > 0) {
    			return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
    		}
    
    		// handle msg
    		StringBuffer handleMsg = new StringBuffer();
    		if (log.getHandleMsg()!=null) {
    			handleMsg.append(log.getHandleMsg()).append("<br>");
    		}
    		if (handleCallbackParam.getHandleMsg() != null) {
    			handleMsg.append(handleCallbackParam.getHandleMsg());
    		}
    
    		// success, update	log  ;(说明:insert log 是 在服务端向客户端发送请求的时候插入的)
    		log.setHandleTime(new Date());
    		log.setHandleCode(handleCallbackParam.getHandleCode());
    		log.setHandleMsg(handleMsg.toString());
    		XxlJobCompleter.updateHandleInfoAndFinish(log);
    
    		return ReturnT.SUCCESS;
    	}

    调度日志的显示:

     点击 页面 执行日志,执行了两部操作:

    首先从 调度器获取 日志id(日志文件的文件名 日志id.log ),日志所在执行器服务器信息  ② 通过页面 ajax 请求 访问执行器 日志文件(配置文件里配置的路径)

    代码如下:

    ① 页面 获取日志id,执行器服务器信息 如下:

     说明:

    文件名称 和  日志id 有关:(每次执行完一个任务,就生成一个文件,而不是 所有日志文件堆积在一起 )

    log filename: logPath/yyyy-MM-dd/9999.log

    ② 读取日志文件(说明:如果 调度中心和 执行器不在一台服务器上,得修改源码,将本地读取日志文件改为 远程读取日志文件)

    @RequestMapping("/logDetailCat")
    	@ResponseBody
    	public ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, int logId, int fromLineNum){
    		try {
    			ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(executorAddress);
               // 读取日志文件的内容
    			ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum);
    
    			// is end
                if (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) {
                    XxlJobLog jobLog = xxlJobLogDao.load(logId);
                    if (jobLog.getHandleCode() > 0) {
                        logResult.getContent().setEnd(true);
                    }
                }
    
    			return logResult;
    		} catch (Exception e) {
    			logger.error(e.getMessage(), e);
    			return new ReturnT<LogResult>(ReturnT.FAIL_CODE, e.getMessage());
    		}
    	}

    读取方法:

        @Override
        public ReturnT<LogResult> log(long logDateTim, int logId, int fromLineNum) {
            //获取日志名称,说明  9999 是 日志id    
            // log filename: logPath/yy yy-MM-dd/9999.log
            String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logDateTim), logId);
    
           // 读取日志 
            LogResult logResult = XxlJobFileAppender.readLog(logFileName, fromLineNum);
            return new ReturnT<LogResult>(logResult);
        }

    如何获取日志文件名称?

    	public static String makeLogFileName(Date triggerDate, int logId) {
    
    		// filePath/yyyy-MM-dd
    		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");	// avoid concurrent problem, can not be static
    		File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
    		if (!logFilePath.exists()) {
    			logFilePath.mkdir();
    		}
    
    		// filePath/yyyy-MM-dd/9999.log
    		String logFileName = logFilePath.getPath()
    				.concat(File.separator)
                    // 拼接  logId 为日志名称
    				.concat(String.valueOf(logId))
                    //  日志后缀
    				.concat(".log");
    		return logFileName;
    	}

    读取日志的方法:

    	public static LogResult readLog(String logFileName, int fromLineNum){
    
    		// valid log file
    		if (logFileName==null || logFileName.trim().length()==0) {
                return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);
    		}
            // 创建文件对象
    		File logFile = new File(logFileName);
    
    		if (!logFile.exists()) {
                return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);
    		}
    
    		// read file
    		StringBuffer logContentBuffer = new StringBuffer();
    		int toLineNum = 0;
    		LineNumberReader reader = null;
    		try {
    			//reader = new LineNumberReader(new FileReader(logFile));
    			reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
    			String line = null;
    
    			while ((line = reader.readLine())!=null) {
    				toLineNum = reader.getLineNumber();		// [from, to], start as 1
    				if (toLineNum >= fromLineNum) {
                        // 读完一行就换行
    					logContentBuffer.append(line).append("\n");
    				}
    			}
    		} catch (IOException e) {
    			logger.error(e.getMessage(), e);
    		} finally {
    			if (reader != null) {
    				try {
    					reader.close();
    				} catch (IOException e) {
    					logger.error(e.getMessage(), e);
    				}
    			}
    		}
    
    		// 返回结果集
    		LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);
    		return logResult;
    
    	}

    展开全文
  • xxl-job原理及介绍

    千次阅读 2021-02-18 17:07:00
    2.10 任务依赖 原理XXL-JOB中每个任务都对应有一个任务ID,同时,每个任务支持设置属性“子任务ID”,因此,通过“任务ID”可以匹配任务依赖关系。 当父任务执行结束并且执行成功时,将会根据“子任务ID”匹配子...
  • XXL-JOB原理

    千次阅读 2022-03-18 11:18:23
    XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。在众多XXL-Job平台的特征中,有如下几条需要关注的: 1、使用简单:...
  • 分布式调度:Quartz,XXL-Job,Elastic-Job,TBSchedule,DolphinScheduler,…… 调度框架对比:https://www.cnblogs.com/davidwang456/p/9057839.html,https://www.cnblogs.com/ssslinppp/p/12485273.html 1.2...
  • 2. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原...
  • XXL-JOB原理--任务执行时间轮(六)

    千次阅读 2021-01-13 00:15:13
    之前 XXL-JOB 任务执行是通过 Quartz来进行任务管理触发的,在之前的博客 《Quartz任务调度框架–任务执行流程》 我们进行了任务执行的流程介绍,目前 XXL-JOB 任务执行已经摒弃 Quartz 框架,目前通过时间轮方式来...
  • 这个ppt是自己调研任务调度框架,最后选型xxl-job框架编写的。主要是对比了目前主流的几个调度系统框架,各自的优缺点等...最后主要介绍了xxl-job的功能特性、框架原理和注意事项。该ppt总计30页,适合学习和技术分享。
  • XXL-JOB原理--定时任务框架简介(一)

    万次阅读 多人点赞 2018-09-10 21:14:01
    之前在工作中有接触许雪里大神开源的分布式任务调度平台XXL-JOB,最近...XXL-JOB原理--任务调度中心任务管理(四) XXL-JOB原理--任务执行(五) 一、完整介绍地址:官方介绍 二、最新版本架构图: 三、介绍...
  • 8. xxl-job 原理-- 任务执行或触发

    千次阅读 2019-05-30 17:37:17
    2. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原...
  • xxl-job集群原理

    千次阅读 2021-11-19 15:13:41
    一、xxl-job集群需要满足的条件 调度中心支持集群部署,提升调度系统容灾和可用性; 调度中心集群部署时,几点要求和建议: 1)DB配置保持一致; 2)集群机器时钟保持一致(单机集群忽视); 官方建议:推荐通过...
  • XXL-Job原理分析

    千次阅读 2017-10-19 16:38:54
    最近调研调度工具时, 看到了 Github 上的XXL-JOB,已在美团内部、以及其他公司有生产应用。 特点 整体情况: XXL-JOB是基于开源 Quartz 调度内核的、为方便企业调度场景而开源的一款实用的调度工具。自带任务配置...
  • 9. xxl-job原理-- jobthread的作用

    千次阅读 2019-05-30 17:37:34
    2. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原...
  • 10. xxl-job原理---回调

    千次阅读 2019-05-30 17:38:55
    2. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job原理-- 调度中心注册 7. xxl-job原理-- 任务管理 8. xxl-job原理--...
  • xxl-job原理合集

    千次阅读 2022-02-15 10:36:14
    2.xxl-job(v2.1.0 Release)执行器端的执行器自动注册原理     3.xxl-job(v2.1.0 Release)调度器端的执行器自动注册原理     4.xxl-job(v2.1.0 Release)任务管理...
  • Xxl-Job执行器原理解析

    千次阅读 2021-05-31 22:49:20
    xxl-job版本:2.3.0 Xxl-Job分为执行器、调度器。... 执行器初始化过程步骤如下 ...1通过加了@Conguration注解的XxlJobConfig初始化,并生成beanName=...-扫描所有bean,加载加了@XxlJob注解类,并记录在job...
  • xxl-job的使用及简述原理

    千次阅读 2020-12-14 21:21:38
    文章目录前言1. 介绍2. 部署篇2.1. 初始化数据库2.2. 部署调度中心2.2.1 集群部署2.3. 部署执行器2.3.1 集群部署3. 使用篇3.1. 设置执行器3.2....本文章基于xxl-job 2.2.0, jdk8 , springboot 2.2.6.RELEASE
  • XXL-JOB原理--执行器注册(二)

    万次阅读 热门讨论 2018-10-17 14:46:01
    1、xxl-job添加执行器到任务调度中心有两种方式 (1)客户端执行器自动将名称和机器地址注册到任务调度中心 (2)可以在任务调度中心手动录入执行器名称和相关的机器地址(多个机器地址用逗号隔开) 2、自动...
  • 4. xxl-job原理-- 执行器注册问题

    千次阅读 2019-05-30 17:36:24
    2. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原...
  • 关于使用xxl-job的流程,这里不再赘述。在加入xxl-job依赖后,需要我们自己做的就是创建一个XxlJobSpringExecutor对象交给spring容器管理,因为XxlJobSpringExecutor实现了SmartInitializingSingleton接口,在spring...
  • 分布式任务调度平台XXL-JOB.pdf,pdf文档来源于官网xxl-job说明
  • 5 xxl-job原理-- 执行器注册问题

    千次阅读 2019-05-30 17:36:37
    2. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原...
  • XXL-JOB任务调度平台 官网 https://www.xuxueli.com/xxl-job/ 中文文档 https://www.xuxueli.com/xxl-job/ 源码仓库地址 https://github.com/xuxueli/xxl-job http://gitee.com/xuxueli0323/xxl-job 下载地址 ...
  • 2. xxl-job原理-- 调度中心 3. xxl-job原理-- 执行器注册 4. xxl-job原理-- 执行器注册问题 5 xxl-job原理-- 执行器注册问题 6. xxl-job 原理-- 调度中心注册 7. xxl-job 原理-- 任务管理 8. xxl-job 原...
  • Xxl-Job调度器原理解析

    2021-06-01 22:48:10
    xxl-job版本:2.3.0 Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。 调度器初始化过程步骤如下 1 国际化相关 配置参数...
  • 在任务调度中心可以进行新建任务,新建任务之后可以在任务列表中查看相关任务,任务可以根据我们配置的cron表达式进行任务调度,或者... 我们了解到xxl-job是基于quartz来实现定时任务的(其实任务调度中心任务执行...
  • 在上一篇博客XXL-JOB学习--执行器注册(一)中我们介绍了xxl-job执行器注册到任务调度中心的流程及相关注册信息,接下来我们看看任务调度中心接受任务注册后做了哪些事情。...1、xxl-job admin通过JobApiCo...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,267
精华内容 506
关键字:

xxl-job 原理