精华内容
下载资源
问答
  • 主要介绍了Java Elastic Job动态添加任务实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • elastic-job 动态添加任务-修改任务

    千次阅读 2019-06-12 17:45:07
    一个发布时间即可搞定,但是由于是两张表混合展示的,所以一个发布时间并不能解决问题,因为lz认为自己的线程知识太烂,所以未敢涉足线程,就在定时任务方向寻找解决方法,以下是我使用的动态修改定时任务的方法,仅...

    最近在做定时发送问题,如果不是两张表混合展示的话,一个发布时间即可搞定,但是由于是两张表混合展示的,所以一个发布时间并不能解决问题,因为lz认为自己的线程知识太烂,所以未敢涉足线程,就在定时任务方向寻找解决方法,以下是我使用的动态修改定时任务的方法,仅供参考,欢迎各位大佬批评指正。

    因为要动态修改cron,所以不能采用xml配置

    在这里插入图片描述

    首先,书写配置类
    dubbo.registry.address.task=ip:2181
    task_namespace=service_task
    
    /**
     * 定时人配置
     *
     * @author johny
     * @date 2019/6/4
     */
    @Configuration
    public class ElasticJobConfig {
    	//配置文件中的zookeeper的ip和端口
        @Value(value = "${dubbo.registry.address.task}")
        private String serverlists;
        //指定一个命名空间
        @Value("${task_namespace}")
        private String namespace;
    
    
        @Bean
        public ZookeeperConfiguration zkConfig() {
            return new ZookeeperConfiguration(serverlists, namespace);
        }
    
        @Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
            return new ZookeeperRegistryCenter(config);
        }
    
    
        @Bean
        public ElasticJobListener elasticJobListener() {
        	//初始化要给定超时多少秒重连
            return new ElasticJobListener(100, 100);
        }
    }
    
    
    定时任务监听(只是为了可以动态执行下一条任务)
    /**
     * 定时任务监听
     *
     * @author johny
     * @date 2019/6/4
     */
    public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    
        @Autowired
        ElasticJobHandler elasticJobHandler;
    
        public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
            super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
        }
    
        @Override
        public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
            System.out.println("========初始化任务前========");
            System.out.println(shardingContexts.getJobParameter());
            System.out.println(new Date());
        }
    
        @Override
        public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
            System.out.println("=======初始化任务后=============");
            elasticJobHandler.addPublishJob();
        }
    }
    
    这个handler 是重点,交给spring管理,其他地方直接调用此类即可
    /**
     * 定时任务添加
     *
     * @author johny
     * @date 2019/6/4
     */
    @Component
    public class ElasticJobHandler {
    
        public static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
        public static final String DATE_FORMAT = "ss mm HH dd MM ? yyyy";
    
        @Resource
        private ZookeeperRegistryCenter registryCenter;
    
        @Resource
        private ElasticJobListener elasticJobListener;
    	/**
    	*这个是任务持久层
    	*/
        @Autowired
        TaskService TaskService;
    
        /**
         * @param jobName
         * @param jobClass
         * @param shardingTotalCount
         * @param cron
         * @param id                 数据ID
         * @return
         */
        private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
                                                                           Class<? extends SimpleJob> jobClass,
                                                                           int shardingTotalCount,
                                                                           String cron,
                                                                           String id) {
            LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                    JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).jobParameter(id).build(), jobClass.getCanonicalName()));
            builder.overwrite(true);
            return builder;
        }
    
        /**
         * 添加一个定时任务
         */
         //此注解为开机自启
        @PostConstruct
        public void addPublishJob() {
            Date date = null;
            String id = "";
            try {
                JSONObject object = TaskService.qryTop();
                if (object == null) {
                    return;
                }
                date = object.getDate("date");
                id = object.getString("feedId");
            } catch (Exception e) {
                e.printStackTrace();
                LogUtils.writeErrorLog(this.getClass().getName(), "查询定时任务异常", "qryTop", "", "", e.getMessage());
            }
            if (date == null) {
                return;
            }
            //得到执行时间
            String cron = getCron(date);
            //shardingTotalCount分片数,多机时,分片为1,任务不会被重复执行
            //overwrite为是否覆盖
            int shardingTotalCount=1;
            //"publishFeedTask"这是一个任务名称,相投的话overwrite(true)时会覆盖原有任务。
            String jobName ="publishTask";
            LiteJobConfiguration jobConfig = simpleJobConfigBuilder(jobName, PublishFeedTask.class, shardingTotalCount,
                    cron, id).overwrite(true).build();
            System.out.println(cron + "======" + id);
            //PublishTask为具体的任务执行逻辑类
            new SpringJobScheduler(new PublishTask(TaskService), registryCenter, jobConfig, elasticJobListener).init();
        }
    
        /**
         * 获得定时
         *
         * @param date
         * @return
         */
        public static String getCron(final Date date) {
            SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
            String formatTimeStr = "";
            if (date != null) {
                formatTimeStr = sdf.format(date);
            }
            return formatTimeStr;
        }
    }
    
    任务具体逻辑类PublishTask.java
    /**
     * 发布feed
     *
     * @author johny
     * @date 2019/6/4
     */
    public class PublishTask implements SimpleJob {
    
        TaskService taskService;
    
        public PublishTask(TaskService taskService) {
            this.taskService = taskService;
        }
    
        @Override
        public void execute(ShardingContext shardingContext) {
            String id = shardingContext.getJobParameter();
            long feedId = Long.parseLong(id);
            try {
                //具体任务逻辑
                
            } catch (Exception e) {
                e.printStackTrace();
                LogUtils.writeErrorLog(this.getClass().getName(), "定时发布", "PublishTask",
                        "system", e.getMessage());
            }
        }
    }
    
    
    其他地方直接引用即可
    
        @Autowired
        ElasticJobHandler elasticJobHandler;
    
    
    	elasticJobHandler.addPublishJob();
    
    

    经线上测试,多service,单portal调用,可以正常动态添加,但是多portal调用时,回调到不同的service,定时任务会停止执行。

    展开全文
  • Elastic-Job动态添加任务

    千次阅读 2018-05-02 09:31:50
    在使用Elastic-Job的过程中,有很多人遇到了这么一个问题,就是如何动态的去添加任务? 在官方的文档中也有对此作出回答,如下: 动态添加作业这个概念每个人理解不尽相同。 elastic-job-lite为jar包,由开发或...

    #背景
    在使用Elastic-Job的过程中,有很多人遇到了这么一个问题,就是如何动态的去添加任务?

    在官方的文档中也有对此作出回答,如下:

    动态添加作业这个概念每个人理解不尽相同。

    elastic-job-lite为jar包,由开发或运维人员负责启动。启动时自动向注册中心注册作业信息并进行分布式协调,因此并不需要手工在注册中心填写作业信息。 但注册中心与作业部署机无从属关系,注册中心并不能控制将单点的作业分发至其他作业机,也无法将远程服务器未启动的作业启动。elastic-job-lite并不会包含ssh免密管理等功能。

    elastic-job-cloud为mesos框架,由mesos负责作业启动和分发。 但需要将作业打包上传,并调用elastic-job-cloud提供的REST API写入注册中心。 打包上传属于部署系统的范畴elastic-job-cloud并未涉及。

    综上所述,elastic-job已做了基本动态添加功能,但无法做到真正意义的完全自动化添加。

    接下来谈谈我对动态任务的理解,我眼中的动态任务分为2种:

    接下来我给大家介绍下Elastic-Job实现上面讲的第二种动态任务的方式,也就是任务的实现逻辑已经是存在的,只是需要发布成多个不同时间去触发的任务。

    #实战

    实现任务的动态添加比较简单,只需要接收任务的信息,然后初始化一下就可以了,在实现的过程中笔者遇到了一个麻烦的问题?

    在多节点分片任务却只有一个节点能执行,问题原因在于当有任务A和任务B,2个节点的时候,我们调用A节点的接口进行任务的动态添加,在A节点中初始化了任务调度器,数据也存储到了注册中心,但是B节点是不知道有新的任务添加,默认的使用方法是每个节点在启动时去初始化任务调度器,而我们的B节点已经启动过了,任务是新添加的。

    解决这个问题最简单的方式就是将任务的节点都集中管理起来,无论动态任务在哪个节点上进行注册,都需要将这个请求转发到其他的节点上进行初始化操作,这样就可以保证多节点分片的任务正常执行。

    还有一种对使用者更友好的办法是对Zookeeper中的节点进行监听,当有新的节点创建时,就自动获取这个节点的配置信息,在本地进行任务初始化,通过这样的方式就可以不用去转发请求到其他节点了,只要在任何节点有添加操作,都能被监听到,并自己去初始化。

    监控代码如下:

    /**
     * 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务
     */
    public void monitorJobRegister() {
    	CuratorFramework client = zookeeperRegistryCenter.getClient();
    	@SuppressWarnings("resource")
    	PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true);  
    	PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {  
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
    	    ChildData data = event.getData();  
    	    switch (event.getType()) {  
    	            case CHILD_ADDED: 
    	                String config = new String(client.getData().forPath(data.getPath() + "/config"));
    	                Job job = JsonUtils.toBean(Job.class, config);
    	                addJob(job);
    	                break;  
    	            default:  
    	                break;  
    	    }  
    	 }  
      };  
    	       
        childrenCache.getListenable().addListener(childrenCacheListener);  
        try { 
    	    childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
    	} catch (Exception e) {
    		e.printStackTrace();
        } 
    }
    

    为了方便大家使用,我将动态添加任务的功能集成到了我之前的elastic-job-spring-boot-starter(https://github.com/yinjihuan/elastic-job-spring-boot-starter)中集成了动态添加的逻辑,大家引入依赖即可使用。

    使用方式比较简单,只需要在启动类上加一个ComponentScan注解,让Spring能够扫描到elastic-job-spring-boot-starter提供的代码即可:

    @SpringBootApplication
    @EnableElasticJob
    //开启动态任务添加API
    @ComponentScan(basePackages = {"com.cxytiandi"})
    public class JobApplication {
    	
    	public static void main(String[] args) {
    		new SpringApplicationBuilder().sources(JobApplication.class).web(true).run(args);
    		try {
    			new CountDownLatch(1).await();
    		} catch (InterruptedException e) {
    		}
    	}
    	
    }
    

    配置好之后,启动项目就可以通过REST API来动态的注册任务,API列表如下:

    • /job

    添加任务是POST请求,数据格式为JSON体提交,格式如下:

    {
    "jobName":"DynamicJob13",
    "cron":"0 33 16 * * ?",
    "jobType":"SIMPLE",
    "jobClass":"com.cxytiandi.job.demo.DynamicJob",
    "jobParameter":"2222222",
    "shardingTotalCount":1
    }
    

    完整字段请参考:https://github.com/yinjihuan/elastic-job-spring-boot-starter/blob/master/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/Job.java

    注意:jobClass必须事先存在于服务中

    • /job/remove
      删除任务是GET请求,参数只要任务名称即可,比如:/job/remove?jobName=任务名。可以用于任务完成之后清空注册中心的任务信息。

    更多技术分享请关注微信公众号:猿天地

    欢迎加入我的知识星球,一起交流技术,免费学习猿天地的课程(http://cxytiandi.com/course)

    PS:目前星球中正在星主的带领下组队学习Spring Cloud,等你哦!

    微信扫码加入猿天地知识星球

    猿天地

    展开全文
  • elastic-job动态添加定时任务

    千次阅读 2017-04-26 15:34:09
    elastic-job的使用过程中,我们会遇到动态添加定时任务的时候,但是官网上面并没有对这块内容进行说明。按照我的理解以及官网上面elastic-job的框架图,ej的定时任务其实是存储在zookeeper的一个个节点上面,所以...

    在elastic-job的使用过程中,我们会遇到动态添加定时任务的时候,但是官网上面并没有对这块内容进行说明。按照我的理解以及官网上面elastic-job的框架图,ej的定时任务其实是存储在zookeeper的一个个节点上面,所以通过给zookeeper添加对应的节点即可完成定时任务的添加动作。

    下面上代码:

    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import com.dangdang.ddframe.job.api.ShardingContext;
    import com.dangdang.ddframe.job.api.simple.SimpleJob;
    import com.dangdang.ddframe.job.config.JobCoreConfiguration;
    import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
    import com.dangdang.ddframe.job.exception.JobSystemException;
    import com.dangdang.ddframe.job.lite.api.JobScheduler;
    import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DynamicAddJob implements SimpleJob{
        private static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
    
        /***
         * @param date 时间
         * @return cron类型的日期
         */
        public static String getCron(final Date date) {
            SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
            String formatTimeStr = "";
            if (date != null) {
                formatTimeStr = sdf.format(date);
            }
            return formatTimeStr;
        }
    
    
        public static void main(String[] args){
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-job.xml");
            ZookeeperRegistryCenter zookeeperRegistryCenter = context.getBean(ZookeeperRegistryCenter.class);
            long now = System.currentTimeMillis();
            for (int i = 0; i < 100; i++) {
                String cron = getCron(new Date(now + (i + 1) * 50000));
                JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("dynamicDemoJob-" + i, cron, 2).build();
                SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, DynamicAddJob.class.getCanonicalName());
                JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
                try {
                    jobScheduler.init();
                }catch (JobSystemException e){
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void execute(ShardingContext shardingContext) {
            switch (shardingContext.getShardingItem()){
                case 0:
                    System.out.println("doing sharding 0...job name is "+shardingContext.getJobName());
                    // do something by sharding item 0
                    break;
                case 1:
                    System.out.println("doing sharding 1...job name is "+shardingContext.getJobName());
                    // do something by sharding item 1
                    break;
            }
        }
    }

    这里用到比较重要的一个类是JobScheduler,这是lite-core里面一个比较核心的类,这个类其实就是我们的job,他的构造方法包含以下参数:

    • CoordinatorRegistryCenter regCenter:注册中心,这里是zookeeper
    • LiteJobConfiguration liteJobConfig:定时任务的配置信息

    这里可以看一下LiteJobConfiguration这个类,采用了设计模式中的建造者模式进行构建。可能看着会比较摸不着头脑,里面的Builder跟平时的不太一样,这里我们需要知道的是ej的源码采用了lombok这个代码简化的工具,只需要通过注解的形式就能将我们平时所需要的get/set和构造器的内容在编译时创建出来,不需要在代码中体现,能够大大简化我们的代码。

    另外还遇到一个坑。这段代码不能重复使用,第一次跑的时候没问题,过段时间再次跑这个代码时,会在init()处报错,原因是我们新建的job根本不能被fire,我跟了进去。发现,job的cron表达式表示的时间还是以前的时间,这就奇怪了,明明我这边配置了一个新的时间。通过debug,进入init方法中,发现他会更新job信息,而更新时,会去zk上面load配置信息,而zk的znode节点是老的节点,上面存储的配置信息也是老的,所以这块的cron表达式也是旧的时间,根本不会被执行,下面贴出源码,供大家参考。

    init()源码:

        /**
         * 初始化作业.
         */
        public void init() {
            LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
            JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
            JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
            JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
            schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
            jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
        }

    updateJobConfiguration()的源码如下:

        /**
         * 更新作业配置.
         *
         * @param liteJobConfig 作业配置
         * @return 更新后的作业配置
         */
        public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
            configService.persist(liteJobConfig);
            return configService.load(false);
        }

    load()源码如下:

        /**
         * 读取作业配置.
         * 
         * @param fromCache 是否从缓存中读取
         * @return 作业配置
         */
        public LiteJobConfiguration load(final boolean fromCache) {
            String result;
            if (fromCache) {
                result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
                if (null == result) {
                    result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
                }
            } else {
                result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
            }
            return LiteJobConfigurationGsonFactory.fromJson(result);
        }

    可以发现这块load有两种,一种是从缓存(这里的缓存使用Map来实现的TreeCache)中获取getJobNodeData,一种是从注册中心也就是zookeeper中获取getJobNodeDataDirectly。load的时候,根据的是zk的路径,其实也就是任务的jobName,所以我们要尽量避免任务名称的重复。

    展开全文
  • 需求 - 定时任务需要通过页面来添加 环境 - SpringBoot、ElasticJob 开发 - 引入ElasticJob、zookeeper pom <dependency> <groupId>com.dangdang</groupId> ...

    需求 - 定时任务需要通过页面来添加 环境 - SpringBoot、ElasticJob 开发 - 引入ElasticJob、zookeeper pom

    <dependency>
        <groupId>com.dangdang</groupId>
        <artifactId>elastic-job-lite-core</artifactId>
        <version>2.1.5</version>
    </dependency>
    <dependency>
        <groupId>com.dangdang</groupId>
        <artifactId>elastic-job-lite-spring</artifactId>
        <version>2.1.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.8</version>
    </dependency>
    

    配置ElasticJobConfig(其中spring.elasticjob.serverList、spring.elasticjob.namespace为配置项)

    @Configuration
    @ConditionalOnExpression("'${spring.elasticjob.serverList}'.length() > 0")
    public class ElasticJobConfig {
    	
    	@Autowired
        private DataSource dataSource;
    	
    	@Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter(@Value("${spring.elasticjob.serverList}") final String serverList, @Value("${spring.elasticjob.namespace}") final String namespace) {
            return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        }
    	
        @Bean
        public JobEventConfiguration jobEventConfiguration() {
            return new JobEventRdbConfiguration(dataSource);
        }
    }
    

    添加job片段

    JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(taskNo, DateHelper.getCron(taskVO.getExecuteTime()), 1).build();
    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, DynamicAddJob.class.getCanonicalName());
    
    JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
    
    jobScheduler.init();
    

    DynamicAddJob代码片段

    public class DynamicAddJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
            String jobName = shardingContext.getJobName();//获取Job名称
            ......
        }
    }
    

    转载于:https://my.oschina.net/u/3694704/blog/1629924

    展开全文
  • elastic-job动态任务配置

    千次阅读 2021-09-12 16:11:30
    在之前的springboot整合elastic-job篇中,我们了解到,elastic-job是一个不错的任务调度框架,本篇将进一步说明,如何使用elastic-job实现常用的动态任务的执行场景 环境准备 zookeeper安装与启动(本地可以使用...
  • 概述 因项目中使用到定时任务...常见的开源方案,如elastic-job、xxl-job、quartz、saturn、opencron、antares等。最终决定使用elastic-jobelastic-job的亮点主要如下: 基于quartz 定时任务框架为基础的,因此...
  • 一、定时任务很多都是配置在配置文件中,但很多时候我们需要根据业务需要调整动态增删改定时任务,这里写出例子供大家参考。 (1)maven依赖 <dependency> <groupId>com.dangdang</groupId> ...
  • elastic-job动态进行任务添加

    千次阅读 2019-06-27 16:47:16
    elastic-job参考官方整合了一下springboot,整合这个是为了进行接下来的任务动态添加 ,有的时候会遇到这样需求,同样的定时任务逻辑,需要动态生成不同的定时任务,而且需要整合到自身的代码中不能用控制台去...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,376
精华内容 1,350
关键字:

elasticjob动态添加任务