精华内容
参与话题
问答
  • 我自己的理解,是我们自己写的逻辑代码,需要定期的去执行,然后在这个背景下,我们会使用到我们的任务调度,在此我使用的是xxl-job任务调度。 二、概述 xxl-job是一个轻量级分布式任务调度平台,其核心设计目标是...

    一、初识xxl-job

    我自己的理解,是我们自己写的逻辑代码,需要定期的去执行,然后在这个背景下,我们会使用到我们的任务调度,在此我使用的是xxl-job任务调度。

    二、概述

    xxl-job是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

    三、下载

    源码仓库地址(一个是githup,另外一个是码云,任何一个下载都行)

    源码仓库地址 Release Download
    https://github.com/xuxueli/xxl-job Download
    http://gitee.com/xuxueli0323/xxl-job Download

    四、开发环境

    1.JDK:1.7+

    2.Servlet/JSP Spec:3.1/2.3

    3.Tomcat:8.5.x/Jetty9.2.x

    4.Spring-boot:1.5.x/Spring4.x

    5.Mysql:5.6+

    6.Maven:3+

    五、操作步骤

    1.先在我们的本地数据库建立16张表,如果未建表,程序启动会报错,具体的建表语句是在我们项目这个位置:

    /xxl-job/doc/db/tables_xxl_job.sql,然后执行sql脚本就行了,会在库中生成16张表。

    表如下:

    2.修改xxl-job项目下的application.properties文件中的一些基本信息:

    2.1.换成自己的数据库

    2.2.换成自己的邮箱以及密码(根据他官网这么修改的)

    六、启动调度中心项目(xxl-job-admin)

    如果已经正确进行上述配置,可将项目编译打包部署。 调度中心访问地址:http://localhost:8080/xxl-job-admin (该地址执行器将会使用到,作为回调地址),登录后运行界面如下图所示

     至此“调度中心”项目已经部署成功。

    七、 配置执行器项目(xxl-job-executor-sample-spring这个项目)

    1.作用:负责接收"调度中心"的调度并执行,可直接部署执行器,也可以将执行器集成到现有业务项目中。

    2.maven依赖

    确认pom文件中引入了 "xxl-job-core" 的maven依赖;

    3.执行器配置

    执行器配置,配置文件地址:

    /xxl-job/xxl-job-executor-samples/xxl-job-executor-sample-spring/src/main/resources/xxl-job-executor.properties
    

    执行器配置,配置内容说明:

    ### xxl-job admin address list:调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"。
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    
    ### xxl-job executor address:执行器"AppName"和地址信息配置:AppName执行器心跳注册分组依据;地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999,执行器IP默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用。单机部署多个执行器时,注意要配置不同执行器端口;
    xxl.job.executor.appname=xxl-job-executor-sample
    xxl.job.executor.ip=
    xxl.job.executor.port=9999
    
    ### xxl-job, access token:执行器通讯TOKEN,非空时启用
    xxl.job.accessToken=
        
    ### xxl-job log path:执行器运行日志文件存储的磁盘位置,需要对该路径拥有读写权限
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/
    
    ### xxl-job log retention days:执行器Log文件定期清理功能,指定日志保存天数,日志文件过期自动删除。限制至少保持3天,否则功能不生效;
    xxl.job.executor.logretentiondays=-1

    八、部署执行器项目(xxl-job-executor-sample-spring这个项目)

    1.如果已经正确进行上述配置,可将执行器项目编译打部署,系统提供多种执行器Sample示例项目,选择其中一个即可,各自的部署方式如下。

    xxl-job-executor-sample-springboot:项目编译打包成springboot类型的可执行JAR包,命令启动即可;
    xxl-job-executor-sample-spring:项目编译打包成WAR包,并部署到tomcat中。
    xxl-job-executor-sample-jfinal:同上
    xxl-job-executor-sample-nutz:同上
    

    2.至此“执行器”项目已经部署结束。

    3.我部署在了同一个tomcat上面了

    九、测试

    1.执行器中的要测试的代码

    2.要达到每分钟执行一次的效果(页面配置):

     

    3.查看日志(达到了一分钟执行一次的效果)

    十、结束

    上面就是我对xxl-job的简单的认识,有不足之处,还请多指教

    参考博客:https://www.cnblogs.com/xuxueli/p/5021979.html

    Always keep the faith!!!

    展开全文
  • java中定时任务Job的使用总结

    千次阅读 2019-05-27 08:53:54
    Job的原理,参考: http://www.cnblogs.com/Dorae/p/9357180.html job的使用,参考: https://blog.csdn.net/u010996565/article/details/78591054 https://blog.csdn.net/hfut_wowo/article/details/64434664 实例...

    Job的原理,参考:
    http://www.cnblogs.com/Dorae/p/9357180.html

    job的使用,参考:
    https://blog.csdn.net/u010996565/article/details/78591054
    https://blog.csdn.net/hfut_wowo/article/details/64434664

    实例,创建一个job,每天凌晨3点执行:

    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    import javax.annotation.Resource;
    
    import org.quartz.CronScheduleBuilder;
    import org.quartz.CronTrigger;
    import org.quartz.Job;
    import org.quartz.JobBuilder;
    import org.quartz.JobDetail;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.quartz.Scheduler;
    import org.quartz.TriggerBuilder;
    import org.quartz.impl.StdSchedulerFactory;
    
    public class SumFinancialJob implements Job
    {
    
    	public SumFinancialJob()
    	{
    	}
    
    	public void execute(JobExecutionContext context) throws JobExecutionException
    	{
    		//得到一个Calendar实例
    		Calendar calendar = Calendar.getInstance();
    		//calendar的日期设为今天
    		calendar.setTime(new Date());
    		//设置calendar为昨天
    		calendar.add(Calendar.DATE, -1);
    		//calendar.add(Calendar.YEAR, -1);//前一年
    		//calendar.add(Calendar.MONTH, -1);//前一月
    
    		Date yesterday = calendar.getTime();
    
    		SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    		//前一天的字符串
    		String pre = simpleDateFormat.format(yesterday);
    		//前一天凌晨0点的字符串
    		String startTime = pre.substring(0, 10) + " 00:00:00";
    		//前一天午夜24点的字符串
    		String endTime = pre.substring(0, 10) + " 24:00:00";
    
    		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    		//获取前一天的日期字符串
    		String preDate = dateFormat.format(yesterday);
    
    		//获取外部传来的参数
    		//JobDataMap map = context.getJobDetail().getJobDataMap();
    		//String ywbDate = map.getString("ywbDate");
    		进行操作……
    	}
    
    	public static void schedulerJob()
    	{
    		try
    		{
    			// 创建一个jobDetail的实例 quartz 2.xx版本
    			JobDetail jobDetail = JobBuilder.newJob(SumFinancialJob.class).withIdentity("train-financial-Job").build();
    			// 创建一个Trigger触发器的实例,每天凌晨3点运行
    			CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("train-financial-Trigger").startNow().withSchedule(CronScheduleBuilder.cronSchedule("0 0 3 * * ?")).build();
    			// 创建schedule实例
    			Scheduler scheduler = new StdSchedulerFactory().getScheduler();
    			// 添加一个job
    			scheduler.scheduleJob(jobDetail, trigger);
    			// 开启一个job
    			scheduler.start();
    		}
    		catch (Exception e)
    		{
    			System.err.println("创建定时任务失败" + e);
    		}
    	}
    }
    
    

    job的使用实例参考:

    https://blog.csdn.net/u010996565/article/details/78591054

    package com.qua.test;
     
    import java.util.Date;
     
    import org.quartz.CronTrigger;
    import org.quartz.JobDataMap;
    import org.quartz.JobDetail;
    import org.quartz.Scheduler;
    import org.quartz.SchedulerFactory;
    import org.quartz.SimpleTrigger;
    import org.quartz.impl.StdSchedulerFactory;
     
    import com.qua.job.PlanJob;
     
    public class Test {
    	public static void main(String[] args) throws Exception {
    		//创建job
    		JobDetail detail=new JobDetail("myJob", "myGroup", PlanJob.class);
    		//向程序内部传入参数
    		JobDataMap map=detail.getJobDataMap();
    		map.put("username", "张三");
    		//这里的触犯器是规定的时间间隔内做的事,不是很常用
    //		//创建触发器
    //		SimpleTrigger trigger=new SimpleTrigger("myTrigger",SimpleTrigger.REPEAT_INDEFINITELY, 3000);
    //		//设置开始执行时间
    //		trigger.setStartTime(new Date(System.currentTimeMillis()+1000));
    		//下面这个是在指定时间做的事,在日常开发中是比较常用的
    		//指定时间
    		CronTrigger trigger=new CronTrigger("myCron", "myGroup", "40 10 11 * * ?");//这里有三个参数(1.触发器的名称2.触发器的组3.时间表达式)
    		
    		//创建调度工厂
    		SchedulerFactory factory=new StdSchedulerFactory();
    		//创建调度器
    		Scheduler scheduler=factory.getScheduler();
    		
    		//绑定job 和触发器
    		scheduler.scheduleJob(detail, trigger);
    		//启动
    		scheduler.start();
    //		//停止定时任务
    //		scheduler.shutdown();
    	}
    }
    
    
    

    Cron表示式示例

    https://blog.csdn.net/zzq900503/article/details/38020573/)

    "0 0 12 * * ? "

    每天12点运行

    “0 15 10 ? * *”

    每天10:15运行

    “0 15 10 * * ?”

    每天10:15运行

    “0 15 10 * * ? *”

    每天10:15运行

    “0 15 10 * * ? 2008”

    在2008年的每天10:15运行

    “0 * 14 * * ?”

    每天14点到15点之间每分钟运行一次,开始于14:00,结束于14:59。

    “0 0/5 14 * * ?”

    每天14点到15点每5分钟运行一次,开始于14:00,结束于14:55。

    “0 0/5 14,18 * * ?”

    每天14点到15点每5分钟运行一次,此外每天18点到19点每5钟也运行一次。

    “0 0-5 14 * * ?”

    每天14:00点到14:05,每分钟运行一次。

    “0 10,44 14 ? 3 WED”

    3月每周三的14:10分到14:44,每分钟运行一次。

    “0 15 10 ? * MON-FRI”

    每周一,二,三,四,五的10:15分运行。

    “0 15 10 15 * ?”

    每月15日10:15分运行。

    “0 15 10 L * ?”

    每月最后一天10:15分运行。

    “0 15 10 ? * 6L”

    每月最后一个星期五10:15分运行。

    “0 15 10 ? * 6L 2007-2009”

    在2007,2008,2009年每个月的最后一个星期五的10:15分运行。

    “0 15 10 ? * 6#3”

    每月第三个星期五的10:15分运行。

    展开全文
  • 一、job 定时任务的创建方式 1、使用线程创建 job 定时任务 /** * TODO 使用线程创建 job 定时任务 * @author 王松 * @date 2019/9/14 0014 22:12 */ public class JobThread { public static class Demo01 { ...

    Quartz表达式生成地址: http://cron.qqe2.com/
    ----支持生成定时任务表达式和反解析,使用Quartz表达式的定时任务如下
    1、xxl-job
    2、springboot 的 @Scheduled
    3、Quartz 框架

    一、job 定时任务的五种创建方式

    1、使用线程创建 job 定时任务

    /**
      * TODO  使用线程创建 job 定时任务
      * @author 王松
      * @date  2019/9/14 0014 22:12
      */
    public class JobThread {
    
        public static class Demo01 {
            static long count = 0;
    
            public static void main(String[] args) {
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        while (true) {
                            try {
                                Thread.sleep(1000);
                                count++;
                                System.out.println(count);
                            } catch (Exception e) {
                                // TODO: handle exception
                            }
                        }
                    }
                };
                Thread thread = new Thread(runnable);
                thread.start();
            }
        }
    }
    

    2、使用 TimerTask 创建job定时任务

    /**
     * TODO  使用 TimerTask 创建job定时任务
     *
     * @author 王松
     * @date 2019/9/14 0014 20:50
     */
    public class JobTimerTask {
    
        static long count = 0;
        public static void main(String[] args) {
            TimerTask timerTask = new TimerTask() {
                @Override
                public void run() {
                    count++;
                    System.out.println(count);
                }
            };
            //创建timer对象设置间隔时间
            Timer timer = new Timer();
            // 间隔天数
            long delay = 0;
            // 间隔毫秒数
            long period = 1000;
            timer.scheduleAtFixedRate(timerTask, delay, period);
        }
    }
    

    3、使用线程池创建 job定时任务

    **
      * TODO  使用线程池创建 job定时任务
      * @author 王松
      * @date  2019/9/14 0014 20:56 
      */
    public class JobScheduledExecutorService {
            public static void main(String[] args) {
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        // task to run goes here
                        System.out.println("Hello !!");
                    }
                };
                ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
                // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
                service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
            }
    }
    

    4.Quartz 框架

    1.引入maven依赖

    <dependencies>
    		<!-- quartz -->
    		<dependency>
    			<groupId>org.quartz-scheduler</groupId>
    			<artifactId>quartz</artifactId>
    			<version>2.2.1</version>
    		</dependency>
    		<dependency>
    			<groupId>org.quartz-scheduler</groupId>
    			<artifactId>quartz-jobs</artifactId>
    			<version>2.2.1</version>
    		</dependency>
    	</dependencies>
    

    2.任务调度类

    public class MyJob implements Job {
    
        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            System.out.println("quartz MyJob date:" + System.currentTimeMillis());
        }
    }
    
    1. 启动类
    public class JobQuartz {
    
        public static void main(String[] args) throws SchedulerException {
            //1.创建Scheduler的工厂
            SchedulerFactory sf = new StdSchedulerFactory();
            //2.从工厂中获取调度器实例
            Scheduler scheduler = sf.getScheduler();
            //3.创建JobDetail,
            JobDetail jb = JobBuilder.newJob(MyJob.class)
                    //job的描述
                    .withDescription("this is a ram job")
                    //job 的name和group
                    .withIdentity("ramJob", "ramGroup")
                    .build();
            //任务运行的时间,SimpleSchedle类型触发器有效,3秒后启动任务
            long time=  System.currentTimeMillis() + 3*1000L;
            Date statTime = new Date(time);
            //4.创建Trigger
            //使用SimpleScheduleBuilder或者CronScheduleBuilder
            Trigger t = TriggerBuilder.newTrigger()
                    .withDescription("")
                    .withIdentity("ramTrigger", "ramTriggerGroup")
                    //.withSchedule(SimpleScheduleBuilder.simpleSchedule())
                    //默认当前时间启动
                    .startAt(statTime)
                    //两秒执行一次,Quartz表达式,支持各种牛逼表达式
                    .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
                    .build();
            //5.注册任务和定时器
            scheduler.scheduleJob(jb, t);
            //6.启动 调度器
            scheduler.start();
        }
    
    

    5. springboot 的 @Scheduled 注解

    @Component
    @Configuration      //1.主要用于标记配置类,兼备Component的效果。
    @EnableScheduling   // 2.开启定时任务
    public class SaticScheduleTask {
         
        @Scheduled(cron = "0/5 * * * * ?")  //3.添加定时任务
        //@Scheduled(fixedRate=5000)        //或直接指定时间间隔,例如:5秒
        private void configureTasks() {
            System.err.println("执行静态定时任务时间: " + LocalDateTime.now());
        }
    }
    

    二、xxl-job 任务调度后台 Admin

    xxl-job 有什么用?
    --------分布式集群的情况下,保证定时任务不被重复执行。
    --------执行原理同Nginx 类型,所有定时任务通过任务调度平台分发,也可配置负载均衡等等
    --------首先让我们能够使用起来,搭建一个自己的任务

    第一步: github下载源码导入

    下载地址:https://github.com/xuxueli/xxl-job/
    当前版本目录结构 2.1.1
    在这里插入图片描述

    第二步: 执行sql

    文件地址:xxl-job/doc/db/tables_xxl_job.sql
    当前2.1.1版本sql
    在这里插入图片描述

    第三步: 修改xxl-job-admin项目配置

    配置文件:application.properties
    修改数据库连接
    在这里插入图片描述

    第四步: 启动admin项目

    springboot 方式启动项目,
    在这里插入图片描述
    访问 http://localhost:8080/xxl-job-admin/
    账号密码:admin / 123456
    在这里插入图片描述
    任务调度中心就搭建好了
    接下来需要创建一个服务器连接任务调度中心

    三、自创建boot项目的任务xxl-job 示例demo

    创建一个 boot 项目

    我的目录结构
    在这里插入图片描述

    pom.xml

    web核心及 xxl-job-core

          <!-- spring-boot-starter-web (spring-webmvc + tomcat) -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- xxl-job-core  版本号根据自己下载的版本修改 -->
            <dependency>
                <groupId>com.xuxueli</groupId>
                <artifactId>xxl-job-core</artifactId>
                <version>2.1.1-SNAPSHOT</version>
            </dependency>
    

    logback.xml

    日志配置直接拷贝

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration debug="false" scan="true" scanPeriod="1 seconds">
    
        <contextName>logback</contextName>
        <property name="log.path" value="/data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/>
    
        <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
    
        <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${log.path}</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
            </rollingPolicy>
            <encoder>
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n
                </pattern>
            </encoder>
        </appender>
    
        <root level="info">
            <appender-ref ref="console"/>
            <appender-ref ref="file"/>
        </root>
    
    </configuration>
    

    application.properties 加入配置

    -----需修改或自定义
    1、xxl-job admin 地址
    2、xxl.job.executor.appname 自定义名称,后台配置必须对应
    3、xxl.job.executor.ip 当前电脑Ip,或部署项目的电脑Ip
    4、xxl.job.executor.port 端口

    # 端口号
    server.port=8081
    # 日志
    logging.config=classpath:logback.xml
    
    ### xxl-job admin 地址,多个逗号分隔"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    
    ### xxl-job名称 || socket ip 当前项目部署的ip地址/本机ip || socket 端口号
    xxl.job.executor.appname=xxl-job-executor-sample
    xxl.job.executor.ip=192.168.43.153
    xxl.job.executor.port=9999
    
    ### xxl-job, access token
    xxl.job.accessToken=
    ### xxl-job log path
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### xxl-job log retention days
    xxl.job.executor.logretentiondays=-1
    

    添加boot配置类 XxlJobConfig

    package xxljob.config;
    import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * xxl-job xxljob.config
     *
     * @author xuxueli 2017-04-28
     */
    @SuppressWarnings("ALL")
    @Configuration
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
    
        @Value("${xxl.job.executor.appname}")
        private String appName;
    
        @Value("${xxl.job.executor.ip}")
        private String ip;
    
        @Value("${xxl.job.executor.port}")
        private int port;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
    
        @Bean(initMethod = "start", destroyMethod = "destroy")
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job xxljob.config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppName(appName);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
            System.err.println(ip+":"+port);
            return xxlJobSpringExecutor;
        }
    
        /**
         * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
         *
         *      1、引入依赖:
         *          <dependency>
         *             <groupId>org.springframework.cloud</groupId>
         *             <artifactId>spring-cloud-commons</artifactId>
         *             <version>${version}</version>
         *         </dependency>
         *
         *      2、配置文件,或者容器启动变量
         *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
         *
         *      3、获取IP
         *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
         */
    }
    

    任务job

    @JobHandler(value="demoJobHandler")
    @Component
    public class DemoJobHandler extends IJobHandler {
    
         static int count;
        @Override
        public ReturnT<String> execute(String param) throws Exception {
            System.out.println("执行job任务"+count++);
            return SUCCESS;
        }
    }
    

    admin 后台配置

    执行管理器下
    在这里插入图片描述
    任务管理下
    编辑任务

    定时规则生成:http://cron.qqe2.com/
    job任务名:@JobHandler注解值 >> 如:@JobHandler(value=“demoJobHandler”)
    在这里插入图片描述

    启动

    在这里插入图片描述
    这样就配置完成了
    在这里插入图片描述
    完成

    展开全文
  • XXL-JOB原理--任务执行(五)

    万次阅读 2018-09-15 21:31:52
    一、任务调度中心发送任务执行请求 任务发送执行的操作有两种: (1)根据配置的cron表达式周期性执行相关任务 (2)在任务调度中心主动执行任务 在注册quartz定时任务时已经注册执行类为RemoteHttpJobBean,...

    一、任务调度中心发送任务执行请求

    任务发送执行的操作有两种:

    (1)根据配置的cron表达式周期性执行相关任务

    (2)在任务调度中心主动执行任务

    在注册quartz定时任务时已经注册执行类为RemoteHttpJobBean,所以周期性执行定时任务会调用RemoteHttpJobBean的executeInternal方法,在executeInternal中会调用JobTriggerPoolHelper.trigger(jobId),通过任务调度中心主动执行任务时也是会调用JobTriggerPoolHelper.trigger(jobId)方法,所以接下来我们要看的是JobTriggerPoolHelper.trigger(jobId)中做的逻辑处理就好。

    public class RemoteHttpJobBean extends QuartzJobBean {
    	private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class);
    
    	@Override
    	protected void executeInternal(JobExecutionContext context)
    			throws JobExecutionException {
    
    		// load jobId
    		JobKey jobKey = context.getTrigger().getJobKey();
    		Integer jobId = Integer.valueOf(jobKey.getName());
    
    		// trigger
    		//XxlJobTrigger.trigger(jobId);
    		JobTriggerPoolHelper.trigger(jobId);
    	}
    
    }

    JobTriggerPoolHelper.trigger所做的操作是将任务提交给一个线程池(任务调度中心默认开启50个线程),在线程池中调用XxlJobTrigger.trigger。 

    public void addTrigger(final int jobId){
            triggerPool.execute(new Runnable() {
                @Override
                public void run() {
                    XxlJobTrigger.trigger(jobId);
                }
            });
        }

    在XxlJobTrigger.trigger中会根据jobId获取任务的基本配置信息(阻塞策略、路由策略、失败重试测试、分组服务器列表等等),然后根据路由策略选择是广播还是单播等,接下来就是组装消息体调用runExecutor方法发送http请求到任务执行器。

    public static void trigger(int jobId) {
    
    		//获取任务信息
            // load data
            XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId);              // job info
            if (jobInfo == null) {
                logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
                return;
            }
    		//根据任务的分组信息找到分组,分组中存在服务器的IP和端口地址等
            XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());  // group info
    		
    		//阻塞策略
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
            //失败策略
    		ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.NULL);    // fail strategy
            //执行路由测试
    		ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    		//服务器地址
            ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
    
    		//广播模式
            // broadcast
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
    			//依次调用所有的服务器
                for (int i = 0; i < addressList.size(); i++) {
                    String address = addressList.get(i);
    
                    // 1、save log-id
                    XxlJobLog jobLog = new XxlJobLog();
                    jobLog.setJobGroup(jobInfo.getJobGroup());
                    jobLog.setJobId(jobInfo.getId());
                    XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
                    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
    
                    // 2、prepare trigger-info
                    //jobLog.setExecutorAddress(executorAddress);
                    jobLog.setGlueType(jobInfo.getGlueType());
                    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
                    jobLog.setExecutorParam(jobInfo.getExecutorParam());
                    jobLog.setTriggerTime(new Date());
    
                    ReturnT<String> triggerResult = new ReturnT<String>(null);
                    StringBuffer triggerMsgSb = new StringBuffer();
                    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
                    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
                    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
                    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()).append("("+i+"/"+addressList.size()+")"); // update01
                    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
                    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailStrategy")).append(":").append(failStrategy.getTitle());
                    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    
                    // 3、trigger-valid
                    if (triggerResult.getCode()==ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
                        triggerResult.setCode(ReturnT.FAIL_CODE);
                        triggerMsgSb.append("<br>----------------------<br>").append(I18nUtil.getString("jobconf_trigger_address_empty"));
                    }
    
                    if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
                        // 4.1、trigger-param
                        TriggerParam triggerParam = new TriggerParam();
                        triggerParam.setJobId(jobInfo.getId());
                        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
                        triggerParam.setExecutorParams(jobInfo.getExecutorParam());
                        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
                        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
                        triggerParam.setLogId(jobLog.getId());
                        triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
                        triggerParam.setGlueType(jobInfo.getGlueType());
                        triggerParam.setGlueSource(jobInfo.getGlueSource());
                        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
                        triggerParam.setBroadcastIndex(i);
                        triggerParam.setBroadcastTotal(addressList.size()); // update02
    
                        // 4.2、trigger-run (route run / trigger remote executor)
    					//远程调用服务接口,执行任务
                        triggerResult = runExecutor(triggerParam, address);     // update03
                        triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
    
                        // 4.3、trigger (fail retry)
                        if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_TRIGGER_RETRY) {
                            triggerResult = runExecutor(triggerParam, address);  // update04
                            triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_fail_trigger_retry") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
                        }
                    }
    
                    // 5、save trigger-info
                    jobLog.setExecutorAddress(triggerResult.getContent());
                    jobLog.setTriggerCode(triggerResult.getCode());
                    jobLog.setTriggerMsg(triggerMsgSb.toString());
                    XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
    
                    // 6、monitor trigger
                    JobFailMonitorHelper.monitor(jobLog.getId());
                    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    
                }
            } else {
    			//单播模式
                // 1、save log-id
                XxlJobLog jobLog = new XxlJobLog();
                jobLog.setJobGroup(jobInfo.getJobGroup());
                jobLog.setJobId(jobInfo.getId());
                XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
                logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
    
                // 2、prepare trigger-info
                //jobLog.setExecutorAddress(executorAddress);
                jobLog.setGlueType(jobInfo.getGlueType());
                jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
                jobLog.setExecutorParam(jobInfo.getExecutorParam());
                jobLog.setTriggerTime(new Date());
    
                ReturnT<String> triggerResult = new ReturnT<String>(null);
                StringBuffer triggerMsgSb = new StringBuffer();
                triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
                triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                        .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
                triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
                triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
                triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
                triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailStrategy")).append(":").append(failStrategy.getTitle());
                triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    
                // 3、trigger-valid
                if (triggerResult.getCode()==ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
                    triggerResult.setCode(ReturnT.FAIL_CODE);
                    triggerMsgSb.append("<br>----------------------<br>").append(I18nUtil.getString("jobconf_trigger_address_empty"));
                }
    
                if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
                    // 4.1、trigger-param
                    TriggerParam triggerParam = new TriggerParam();
                    triggerParam.setJobId(jobInfo.getId());
                    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
                    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
                    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
                    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
                    triggerParam.setLogId(jobLog.getId());
                    triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
                    triggerParam.setGlueType(jobInfo.getGlueType());
                    triggerParam.setGlueSource(jobInfo.getGlueSource());
                    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
                    triggerParam.setBroadcastIndex(0);
                    triggerParam.setBroadcastTotal(1);
    
                    // 4.2、trigger-run (route run / trigger remote executor)
    				//路由后远程调用服务接口,执行任务
                    triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
                    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
    
                    // 4.3、trigger (fail retry)
                    if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_TRIGGER_RETRY) {
                        triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
                        triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_fail_trigger_retry") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
                    }
                }
    
                // 5、save trigger-info
                jobLog.setExecutorAddress(triggerResult.getContent());
                jobLog.setTriggerCode(triggerResult.getCode());
                jobLog.setTriggerMsg(triggerMsgSb.toString());
                XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
    
                // 6、monitor trigger
                JobFailMonitorHelper.monitor(jobLog.getId());
                logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
            }
    
        }

    runExecutor方法中根据address服务器地址,XxlJobDynamicScheduler.getExecutorBiz中会获取代理类最终调用JettyClient的send方法。

    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
            ReturnT<String> runResult = null;
            try {
    			//获取代理对象
                ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
    			//最终调用执行
                runResult = executorBiz.run(triggerParam);
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
                runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
    
            StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
            runResultSB.append("<br>address:").append(address);
            runResultSB.append("<br>code:").append(runResult.getCode());
            runResultSB.append("<br>msg:").append(runResult.getMsg());
    
            runResult.setMsg(runResultSB.toString());
            runResult.setContent(address);
            return runResult;
        }

    在XxlJobDynamicScheduler的getExecutorBiz中会通过NetComClientProxy生成代理对象,在执行时会调用其方法。

    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
            // valid
            if (address==null || address.trim().length()==0) {
                return null;
            }
    
            // load-cache
            address = address.trim();
            ExecutorBiz executorBiz = executorBizRepository.get(address);
            if (executorBiz != null) {
                return executorBiz;
            }
    
            // set-cache
            executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();
            executorBizRepository.put(address, executorBiz);
            return executorBiz;
        }

    getObject中生成代理对象,执行会执行JettyClient的send方法。

    @Override
    	public Object getObject() throws Exception {
    		return Proxy.newProxyInstance(Thread.currentThread()
    				.getContextClassLoader(), new Class[] { iface },
    				new InvocationHandler() {
    					@Override
    					public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
    						// filter method like "Object.toString()"
    						if (Object.class.getName().equals(method.getDeclaringClass().getName())) {
    							logger.error(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", method.getDeclaringClass().getName(), method.getName());
    							throw new RuntimeException("xxl-rpc proxy class-method not support");
    						}
    						
    						// request
    						RpcRequest request = new RpcRequest();
    	                    request.setServerAddress(serverAddress);
    	                    request.setCreateMillisTime(System.currentTimeMillis());
    	                    request.setAccessToken(accessToken);
    	                    request.setClassName(method.getDeclaringClass().getName());
    	                    request.setMethodName(method.getName());
    	                    request.setParameterTypes(method.getParameterTypes());
    	                    request.setParameters(args);
    						
    						//发起http调用,执行任务
    	                    // send
    	                    RpcResponse response = client.send(request);
    	                    
    	                    // valid response
    						if (response == null) {
    							throw new Exception("Network request fail, response not found.");
    						}
    	                    if (response.isError()) {
    	                        throw new RuntimeException(response.getError());
    	                    } else {
    	                        return response.getResult();
    	                    }
    	                   
    					}
    				});
    	}

    任务调度中心向任务执行器发送的任务请求数据如下。

    二、任务执行器接收任务执行

    在任务执行器会根据内置的jetty提供web服务,提供请求处理器JettyServerHandler接收处理任务调度中心发送过来的任务

    public class JettyServerHandler extends AbstractHandler {
    	private static Logger logger = LoggerFactory.getLogger(JettyServerHandler.class);
    
    	//接收请求,处理任务
    	@Override
    	public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
    		
    		// invoke
    		//调用任务执行
            RpcResponse rpcResponse = doInvoke(request);
    
            // serialize response
            byte[] responseBytes = HessianSerializer.serialize(rpcResponse);
    		
    		response.setContentType("text/html;charset=utf-8");
    		response.setStatus(HttpServletResponse.SC_OK);
    		baseRequest.setHandled(true);
    		
    		OutputStream out = response.getOutputStream();
    		out.write(responseBytes);
    		out.flush();
    		
    	}
    
    	private RpcResponse doInvoke(HttpServletRequest request) {
    		try {
    			// deserialize request
    			byte[] requestBytes = HttpClientUtil.readBytes(request);
    			if (requestBytes == null || requestBytes.length==0) {
    				RpcResponse rpcResponse = new RpcResponse();
    				rpcResponse.setError("RpcRequest byte[] is null");
    				return rpcResponse;
    			}
    			//反序列化数据
    			RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);
    
    			// invoke
    			//通过反射调用任务
    			RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
    			return rpcResponse;
    		} catch (Exception e) {
    			logger.error(e.getMessage(), e);
    
    			RpcResponse rpcResponse = new RpcResponse();
    			rpcResponse.setError("Server-error:" + e.getMessage());
    			return rpcResponse;
    		}
    	}
    
    }

    在invokeService根据发送过来的类名com.xxl.job.core.biz.ExecutorBiz和方法run,通过反射机制调用

    public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
    		if (serviceBean==null) {
    			serviceBean = serviceMap.get(request.getClassName());
    		}
    		if (serviceBean == null) {
    			// TODO
    		}
    
    		RpcResponse response = new RpcResponse();
    
    		if (System.currentTimeMillis() - request.getCreateMillisTime() > 180000) {
    			response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The timestamp difference between admin and executor exceeds the limit."));
    			return response;
    		}
    		if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) {
    			response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The access token[" + request.getAccessToken() + "] is wrong."));
    			return response;
    		}
    
    		try {
    			Class<?> serviceClass = serviceBean.getClass();
    			String methodName = request.getMethodName();
    			Class<?>[] parameterTypes = request.getParameterTypes();
    			Object[] parameters = request.getParameters();
    
    			FastClass serviceFastClass = FastClass.create(serviceClass);
    			FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
    
    			Object result = serviceFastMethod.invoke(serviceBean, parameters);
    
    			response.setResult(result);
    		} catch (Throwable t) {
    			t.printStackTrace();
    			response.setError(t.getMessage());
    		}
    
    		return response;
    	}

    我们看看com.xxl.job.core.biz.ExecutorBiz的run方法中做了什么处理操作。

    在ExecutorBiz中根据发送过来的消息,根据demoJobHandler找到接口的实现类,接下来就可以新起线程去执行实现类DemoJobHandler了。

    @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            // load old:jobHandler + jobThread
            //创建执行线程
            JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
            //如果存在则直接使用老的线程
            IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
            String removeOldReason = null;
    
            // valid:jobHandler + jobThread
            GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
            if (GlueTypeEnum.BEAN == glueTypeEnum) {
    
                // new jobhandler
    			//根据类名找到任务执行类
                IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
    
                // valid old jobThread
                if (jobThread!=null && jobHandler != newJobHandler) {
                    // change handler, need kill old thread
                    removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = newJobHandler;
                    if (jobHandler == null) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                    }
                }
    
            } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof GlueJobHandler
                            && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change handler or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    try {
                        IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                        jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                    }
                }
            } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof ScriptJobHandler
                                && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change script or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
                }
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
            }
    
            // executor block strategy
            if (jobThread != null) {
                ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
                if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                    // discard when running
                    if (jobThread.isRunningOrHasQueue()) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                    }
                } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                    // kill running jobThread
                    if (jobThread.isRunningOrHasQueue()) {
                        removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
    
                        jobThread = null;
                    }
                } else {
                    // just queue trigger
                }
            }
    
            // replace thread (new or exists invalid)
    		//起线程执行任务
            if (jobThread == null) {
                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
            }
    
            // push data to queue
            ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
            return pushResult;
        }

    总结:任务执行器提供web服务,任务调度中心根据任务分组及分组服务器发送http请求,任务执行器收到请求,根据请求中的数据调用对应的任务。

     

     

    展开全文
  • Job任务分解

    千次阅读 2011-12-08 17:33:58
    在前面的博文JobTracker任务调度器之JobQueueTaskScheduler中,我讲述Job任务调度的时候简单地讲述了一下Job任务的分解,也就是将一个作业Job切分为一个个相对独立的map任务,以及后面的reduce任务。那么,Hadoop的...
  • ORACLE定时执行Job任务

    千次阅读 2018-07-23 12:09:44
    因为是统计周月数据,便想到了数据库定时Job任务创建。 在百度上,找到了一篇好文章,跟大家分享:http://langgufu.iteye.com/blog/1179235 另一篇文章是关于ORACLE时间间隔书写:...
  • XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 1.2特性 1、简单:支持通过Web页面对任务进行CRUD操作,操作...
  • 新建程序 加入pom依赖 <dependency> <groupId>com.xuxueli</groupId>...xxl-job-core</artifactId> <version>1.9.1</version> </dependency> 新建一个配...
  • ### xxl-job admin address list:调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"。 addresses: http://这里是IP:...
  • 传统定时任务调度的缺陷: 1、没有补偿机制,如每天晚上2点定时执行一个任务,但期间抛出异常,则只能等到第二天晚上2点才能执行。 2、不支持集群 3、不支持路由策略 4、没有job管理平台【方便查看哪些任务执行...
  • 当项目中含有定时任务的时候,为方便管理定时任务,这时候可以考虑整合xxl-job任务调度中心,整合十分简单。 下载xxj-job源码,下载地址:码云地址 github地址 下载好解压,目录结构: 第一步 生成基础表 进入doc &...
  • 请点击以下链接进入本人博客园对应文章spring boot整合xxl-job任务调度平台实现分配定时任务集群
  • 任务调度中心可以进行新建任务,新建任务之后可以在任务列表中查看相关任务,任务可以根据我们配置的cron表达式进行任务调度,或者也可以在任务列表中执行、暂停、删除和查看相关运行日志等操作。 一、任务调度...
  • 为什么我们需要定时任务 很多业务场景需要我们某一特定的时刻去做某件任务,定时任务解决的就是这种业务场景。一般来说,系统可以使用消息传递代替部分定时任务,两者有很多相似之处,可以相互替换场景。如,上面...
  • 人工智能,零基础入门!... 现在的web项目中很多场景下都有要执行定时任务的需求,比如说每隔5秒将redis中的统计当天注册用户数持久化...后面我接触到一种比较方便快速实现job任务的方法,总结下来: 一、项目文件结...
  • Elastic-Job原理--任务调度处理(四)

    千次阅读 2018-10-10 20:11:58
    在上一篇博客Elastic-Job原理--任务分片策略(三)我们已经了解了Elastic-Job任务分片策略,这篇博客我们了解学习一下Elastic-Job是如何执行分片任务的。  首先,Elastic-Job的定时任务执行机制还是基于quartz...
  • xxl-job分布式任务调度的初步使用

    千次阅读 2018-05-10 14:33:10
    xxl-job是 许雪里 同学开发的一个轻量级分布式任务调度框架。在这里开个玩笑,许雪里同学是个man(哈哈哈哈)!xxl-job的介绍和学习地址:https://www.cnblogs.com/xuxueli/p/5021979.html ...
  • XXL-JOB分布式任务调度平台

    千次阅读 热门讨论 2018-06-24 21:14:32
    XXL-JOB是一个轻量级分布式任务调度框架,开箱即用。 特性 简单:通过Web页面操作简单易用。 任务实时监控,可查看任务执行日志。 动态:可以动态修改任务状态,暂停或恢复任务,也可以终止进行中的任务。 路由...
  • XXL-JOB任务调度平台

    2019-04-17 09:43:14
    参考...蛮详细的,先部署xxl-job-admin,端口为8080,为任务调度平台,初始化mysql记住了 如何部署执行器:xxl-job-execute-sam...
  • Elastic-job 与 xxl-job 任务调度框架

    千次阅读 2020-03-12 16:31:43
    #xxl-job文档地址 1.https://www.xuxueli.com/xxl-job/ #Elastic-job 文档地址 2.http://elasticjob.io/index_zh.html 3.建议使用xxl-job,原因如下: 偏重量级框架; 依赖Spring,mysql,maven手动编译; 提供...

空空如也

1 2 3 4 5 ... 20
收藏数 164,316
精华内容 65,726
关键字:

job任务