-
Hera
2020-12-08 22:14:05<div><p>Hera a Css builder this replace the cssbuilder.js in phobos that is DEAD code ( noway to run) <p>john <p>Enyo-DCO-1.1-Signed-off-by: john mcconnell johnmcconnell.com</p> <p>ok i think i got ... -
hera-hera-master.zip
2020-06-05 10:34:12数据平台打造的任务调度系统(HERA) hera分布式任务调度系统是根据前阿里开源调度系统(zeus)进行的二次开发,其中zeus大概在2014年开源,开源后却并未进行维护。我公司(二维火)2015年引进了zeus任务调度系统,一直... -
当使用git把hera克隆到本地之后,首先在hera/hera-admin/resources目录下找到hera.sql文件,在自己的数据库中新建这些必要的表,并插入初始化的数据(如果你目前使用的是低版本的hera,那么你可以到 update 目录查看...
-
Fuzzing Hera
2020-12-09 01:58:00<div><p>It should be fuzzed with random inputs to be executed (with and without the sentinel). <p>Most of the fuzzing corpus should be built with the successful test cases of ewasm...ewasm/hera</p></div> -
Android-hera.zip
2019-09-17 17:00:26Android-hera.zip,运行微信applet的框架。(软件开发工具包、iOS),安卓系统是谷歌在2008年设计和制造的。操作系统主要写在爪哇,C和C 的核心组件。它是在linux内核之上构建的,具有安全性优势。 -
Move hera to submodule
2020-12-26 06:27:50<p>Previously <code>hera</code> submodule (<code>bottleneck</code> and <code>wasserstein</code> implementations) was not a submodule but sources were hard-coded. This PR removes old <code>hera</code> ... -
HERA数据的高能行为研究
2020-05-06 05:27:060.001)区域中的高精度HERA F2数据。 λ是F2∝(1 / x)λ定义的F2上升速率的量度。 我们显示,在这两个区域中,在各个Q2值处确定的λ与低x区域相比,在极低x区域中系统地较小。 我们讨论了对此影响的一些可能的物理... -
Atom-Hera,做事情的电报机器人。通过在github上创建一个帐户,为pnre/hera的开发做出贡献。.zip
2019-09-18 22:27:07Atom-Hera.zip,做事情的电报机器人要求,atom是一个用web技术构建的开源文本编辑器。 -
hera源码剖析:项目启动之分布式锁
2020-09-29 17:41:53本文章主要是为了让使用者能够更加了解 hera 的原理,并且能够在之基础上进行改进所进行。hera 是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera 获取当前机器ip 在 ...前言
本文章主要是为了让使用者能够更加了解
hera
的原理,并且能够在之基础上进行改进所进行。hera
是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera获取当前机器ip
在
hera
中,有一些静态代码块,这里只说一个很重要的部分,WorkContext
类中有这样一部分代码static { host = NetUtils.getLocalAddress(); HeraLog.info("-----------------------------当前机器的IP为:{}-----------------------------", host); //省略部分代码 }
该代码会获取当前机器的
ip
信息,由于多网卡的原因可能获取的ip不是很准确,此时你可以通过在hera-admin/src/main/resources/config/hera.properties
文件,修改server.ip=127.0.0.1
配置为当前机器ip
即可。具体的代码不再深入。分布式锁
hera
是使用spring boot
开发的,启动项目执行AdminBootstrap.main
方法,由于DistributeLock#init
方法使用了@PostConstruct
注册,首先会进入该方法@PostConstruct public void init() { workClient.workSchedule.scheduleAtFixedRate(this::checkLock, 10, 60, TimeUnit.SECONDS); }
在该方法中会向
ScheduledThreadPoolExecutor
线程池提交一个每隔60
秒执行的任务,具体任务内容在DistributeLock#checkLock
方法public void checkLock() { try { String ON_LINE = "online"; //1.从hera_lock表查询最新记录 HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE); //2.如果当前没有master,直接以当前机器ip插入hera_lock新记录 if (heraLock == null) { Date date = new Date(); heraLock = HeraLock.builder() .id(1) .host(WorkContext.host) .serverUpdate(date) .subgroup(ON_LINE) .gmtCreate(date) .gmtModified(date) .build(); Integer lock = heraLockService.insert(heraLock); if (lock == null || lock <= 0) { return; } } //3.master判断 if (isMaster = WorkContext.host.equals(heraLock.getHost().trim())) { heraLock.setServerUpdate(new Date()); heraLockService.update(heraLock); HeraLog.info("hold lock and update time"); heraSchedule.startup(); } else { long currentTime = System.currentTimeMillis(); long lockTime = heraLock.getServerUpdate().getTime(); long interval = currentTime - lockTime; long timeout = 1000 * 60 * 5L; if (interval > timeout && isPreemptionHost()) { Date date = new Date(); Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost()); if (lock != null && lock > 0) { ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host); heraSchedule.startup(); heraLock.setHost(WorkContext.host); //TODO 接入master切换通知 } else { HeraLog.info("master抢占失败,由其它worker抢占成功"); } } else { //非主节点,调度器不执行 主要是为了避免手动修改hera_lock表后出现多master问题 heraSchedule.shutdown(); } } //4.初始化work服务 workClient.init(); //5.连接master workClient.connect(heraLock.getHost().trim()); } catch (Exception e) { ErrorLog.error("检测锁异常", e); } }
在解释这些代码之前大家要知道
hera
系统有个hera_lock
表,该表最多只会有一条记录,并且该记录保存着当前的master ip
。也就是说大家如果想切换master
,可以直接通过修改该条记录的ip
来实现。DistributeLock#checkLock
每次被调用时第一步会从hera_lock
表查询出最新的master
信息,至于在这里使用了online
进行过滤没有实际意义。如果当前没有master
,即hera_lock
等于null
(一般在第一次部署启动hera
时会有该情况),为了方便调试此时会自动把当前机器设置为master
,当然如果插入当前hera_lock
记录失败(被其它work
插入了),会直接返回等待下次调用。- 在第
3
部分,首先会判断当前机器的ip
信息与hera_lock
表的ip
是否匹配,如果匹配则表示当前机器为master
,然后更新数据库的server_update
时间,所以一定要保证你的所有机器时钟一致哦,最后调用heraSchedule.startup()
方法来启动master
服务。 - 如果发现当前机器的
ip
信息与hera_lock
表的ip
不匹配,则表示当前机器是work
,此时会通过long interval = currentTime - lockTime;
来计算master
上次的心跳时间间隔,如果发现master
已经超出5分钟未发送新的心跳,则通过isPreemptionHost
方法判断当前机器是否允许抢占master
,如果允许则通过Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
方法来进行抢占。抢占sql
为
# 乐观锁方式进行抢占,保证同一时间只有一台机器能够抢占成功 update hera_lock set gmt_modified = #{gmtModified},host = #{host},server_update = #{serverUpdate} where host = #{lastHost}
如果发现
work
抢占master
成功,则调用heraSchedule.startup();
来启动master
服务。private boolean isPreemptionHost() { List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnv.preemptionMasterGroup); if (preemptionHostList.contains(WorkContext.host)) { return true; } else { HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString()); return false; } }
isPreemptionHost
方法主要是判断当前机器是否在允许抢占的机器组,该配置为:hera.preemptionMasterGroup
- 在第
4
部分会进行work
服务的初始化,此时需要注意的是:master
会启动master
服务和work
服务,work
只启动work
服务。也就是说,你可以在本地idea
启动hera
来进行调试,你也可以在生产环境只有一台机器进行任务调度。 - 在第
5
部分会进行work
服务连接master
服务的操作,即netty
通信打开
看到这里,想必你已经了解了
DistributeLock
类是一个定时进行分布式锁检测的类,它决定着当前机器是启动master
服务还是work
服务知识点总结
- 可以通过直接修改
hera_lock
表的数据来切换master
- 可以只启动一台机器来进行
hera
的调度与开发 - 可以通过配置
hera.preemptionMasterGroup
参数来让某些机器允许抢占master
master服务
在分布式锁中,如果当前机器抢到了
master
,那么此时该机器会调用HeraSchedule#startup
启动master
服务public void startup() { if (!running.compareAndSet(false, true)) { return; } HeraLog.info("begin to start master context"); masterContext.init(); }
为了保证
master
服务只被启动一次,使用了原子类AtomicBoolean
继续往下看public void init() { //主要处理work的请求信息 threadPool = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy()); //主要管理master的一些延迟任务处理 masterSchedule = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("master-schedule", false)); masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES); masterSchedule.allowCoreThreadTimeOut(true); //开启quartz服务 getQuartzSchedulerService().start(); dispatcher = new Dispatcher(); //初始化master端的netty消息handler handler = new MasterHandler(this); //初始化master server masterServer = new MasterServer(handler); masterServer.start(HeraGlobalEnv.getConnectPort()); master.init(this); //master的定时任务管理者 choreService = new ChoreService(5, "chore-service"); //重跑任务初始化 rerunJobInit = new RerunJobInit(master); choreService.scheduledChore(rerunJobInit); //重跑任务启动 rerunJobLaunch = new RerunJobLaunch(master); choreService.scheduledChore(rerunJobLaunch); //信号丢失检测 lostJobCheck = new LostJobCheck(master, new DateTime().getMinuteOfHour()); choreService.scheduledChore(lostJobCheck); //心跳检测 heartCheck = new WorkHeartCheck(master); choreService.scheduledChore(heartCheck); //版本生成 actionInit = new JobActionInit(master); choreService.scheduledChore(actionInit); //任务是否完成检测 finishCheck = new JobFinishCheck(master); choreService.scheduledChore(finishCheck); //队列扫描 queueScan = new JobQueueScan(master); choreService.scheduledChoreOnce(queueScan); HeraLog.info("end init master content success "); }
这部分代码主要功能是
- 初始化
master
端的消息处理线程池threadPool
- 初始化
master
端的延迟任务线程池masterSchedule
- 开启
quartz
服务 - 初始化
master
端的netty
消息handler
- 初始化
master
的定时任务管理者,任务有:任务重跑初始化、重跑任务启动、信号丢失检测、心跳检测、版本生成、任务是否完成检测、队列扫描等
work服务
//保证只启动一次 if (!clientSwitch.compareAndSet(false, true)) { return; } workContext.setWorkClient(this); //在这里目前是空的,公司内部初始化了一些公共数据源配置 workContext.init(); eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS)) .addLast("frameDecoder", new ProtobufVarint32FrameDecoder()) .addLast("decoder", new ProtobufDecoder(RpcSocketMessage.SocketMessage.getDefaultInstance())) .addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()) .addLast("encoder", new ProtobufEncoder()) .addLast(new WorkHandler(workContext)); } }); HeraLog.info("init work client success "); workSchedule.schedule(new Runnable() { private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat(); private int failCount = 0; @Override public void run() { try { if (workContext.getServerChannel() != null) { boolean send = workerHandlerHeartBeat.send(workContext); if (!send) { failCount++; ErrorLog.error("send heart beat failed ,failCount :" + failCount); } else { failCount = 0; HeartLog.debug("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress()); } } else { ErrorLog.error("server channel can not find on " + WorkContext.host); } } catch (Exception e) { ErrorLog.error("heart beat error:", e); } finally { workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS); } } }, HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS); //省略定时更新日志的代码
在
work
服务这边同样使用了原子类保证只启动一次,然后就是初始化work
的netty
服务,再往下就是定时发送心跳信息给master
work连接master
DistributeLock#checkLock
方法做的最后一件事情就是work
连接master
,打开通信,调用的方法为:workClient.connect(heraLock.getHost().trim());
public synchronized void connect(String host) throws Exception { if (workContext.getServerChannel() != null) { if (workContext.getServerHost().equals(host)) { return; } else { workContext.getServerChannel().close(); workContext.setServerChannel(null); } } workContext.setServerHost(host); CountDownLatch latch = new CountDownLatch(1); ChannelFutureListener futureListener = (future) -> { try { if (future.isSuccess()) { workContext.setServerChannel(FailFastCluster.wrap(future.channel())); SocketLog.info(workContext.getServerChannel().toString()); } } catch (Exception e) { ErrorLog.error("连接master异常", e); } finally { latch.countDown(); } }; ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, HeraGlobalEnv.getConnectPort())); connectFuture.addListener(futureListener); if (!latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS)) { connectFuture.removeListener(futureListener); connectFuture.cancel(true); throw new ExecutionException(new TimeoutException("connect server consumption of 2 seconds")); } if (!connectFuture.isSuccess()) { throw new RuntimeException("connect server failed " + host, connectFuture.cause()); } SocketLog.info("connect server success"); }
连接
master
时会首先判断当前是否已经连接了master
,如果已经连接并且和当前master ip
一致则直接返回,否则关闭netty
通信,重置当前work
的master
信息。然后通过
CountDownLatch
的await(long timeout, TimeUnit unit)
方法,来进行work
与master
的超时连接判断,通过ChannelFutureListener
在master
与work
通信连接成功时设置ServerChannel
,并且容错方式为快速失败FailFastCluster.wrap(future.channel())
-
共线改善的BK进化符合HERA数据
2020-03-30 05:35:11在这里,我们通过面对HERA数据来研究该方程作为现象学工具的相关性。 为此,我们首先通过包括两类单对数校正来提高恢复的扰动精度:由DGLAP分裂函数中的第一个非奇异项生成的校正和表示QCD耦合的单循环运行的校正 ... -
数据平台打造的任务调度系统(HERA)
2019-08-08 05:49:49hera系统只是负责调度以及辅助的系统,具体的计算还是要落在hadoop、hive、yarn、spark等集群中去。所以此时又一个硬性要求,如果要执行hadoop,hive,spark等任务,我们的hera系统的worker一定要部署在这些集群某些... -
Qtile 0.14.2 Elementary Hera 5.1
2020-12-01 22:13:54<p>I am trying to install Qtile 0.14.2 on Elementary Hera 5.1, but pip3 gives me this error, is there a way i can force pip3 to install it? <p>Thanks for the help. <p>this is the log <p>gibranlp:~/... -
[WIP] Support WAVM in Hera
2020-12-09 01:43:41<div><p>merges into vm-abstr <p>Part of #158.</p><p>该提问来源于开源项目:ewasm/hera</p></div> -
HERA对接触相互作用和幼体夸克的限制
2020-05-04 04:47:00在等式接触相互作用(CI)的框架中使用了对应于1 fb-1左右的光度的高精度HERA数据,以设置对超出标准模型对电子夸克散射的可能高能贡献的限制。 考虑了中性和带电电流ep散射中包含的深部非弹性横截面的测量。 ep数据... -
HERA数据和共线改善的BK动力学
2020-05-06 04:51:12在偶极分解的框架内,我们使用了最近经共线性改进的Balitsky-Kovchegov方程来拟合HERA数据,以计算小Bjorken x处的包容性深层非弹性散射。 该方程式包括双横向对数和单横向对数的全阶求和以及运行耦合校正。 与以前... -
Hera fails to execute AssemblyScript WAST
2021-01-02 11:48:15--vm hera/build/src/libhera.dylib --filltests --singletest wrc20Challenge Running tests using path: "tests/" Running 1 test case... Test Case "stEWASMTests": 100% cpp-ethereum/test/... -
断裂功能框架下HERA ep碰撞中主要核子产生的现象学
2020-04-10 13:17:57近年来,在e-p对撞机HERA上进行的几项实验已收集了携带大量质子能量的主要核子光谱上的高精度深非弹性散射(DIS)数据。 在本文中,我们分析了在扰动QCD框架下在HERA DIS中产生正向质子和中子的最新实验数据。 我们... -
深入分析偶极子模型中合并和不饱和的HERA数据
2020-05-04 06:47:16我们提出了一种更新的与冲击参数有关的饱和度模型,该模型通过对组合的HERA I和I + II减小的横截面数据进行拟合确定。 使用相同的HERA数据来拟合所施加偶极子振幅的线性化版本,这使得可以在各种实验中估计饱和效应... -
Item Counter Issue- Tower of Hera
2020-12-08 20:20:12<p>Description of problem: I got to Tower of Hera and picked up all dungeon items/keys. The counter on screen said 5/6. I went back to double check all item locations but I'd picked everything up.... -
HERA带电电流深层非弹性散射中的魅力产生
2020-04-18 03:10:50首次在e±p碰撞中测量了带电电流深层非弹性散射中的魅力产生,这是使用ZEUS检测器在HERA处收集到的,对应于358 pb-1的综合光度。 在200 GeV2 <Q 2 <60000的运动相空间区域内,在质心能量为s $$ \ sqrt {s} $$ ... -
赫拉(hera)分布式任务调度系统
2019-05-22 09:44:52赫拉(hera)分布式任务调度系统之架构,基本功能(一) 赫拉(hera)分布式任务调度系统之项目启动(二) 赫拉(hera)分布式任务调度系统之开发中心(三) 赫拉(hera)分布式任务调度系统之版本(四) 赫拉(hera)分布式任务...相关介绍
- 赫拉(hera)分布式任务调度系统之架构,基本功能(一)
- 赫拉(hera)分布式任务调度系统之项目启动(二)
- 赫拉(hera)分布式任务调度系统之开发中心(三)
- 赫拉(hera)分布式任务调度系统之版本(四)
- 赫拉(hera)分布式任务调度系统之Q&A(五)
1.实现集群HA,机器宕机环境实现机器断线重连与心跳恢复与hera集群HA,节点单点故障环境下任务自动恢复,master断开,work抢占master
@Component public class DistributeLock { @Autowired private HeraHostRelationService hostGroupService; @Autowired private HeraLockService heraLockService; @Autowired private WorkClient workClient; @Autowired private HeraSchedule heraSchedule; private final long timeout = 1000 * 60 * 5L; private final String ON_LINE = "online"; @PostConstruct public void init() { workClient.workSchedule.scheduleAtFixedRate(() -> { try { checkLock(); } catch (Exception e) { e.printStackTrace(); } }, 10, 60, TimeUnit.SECONDS); } public void checkLock() { HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE); if (heraLock == null) { Date date = new Date(); heraLock = HeraLock.builder() .id(1) .host(WorkContext.host) .serverUpdate(date) .subgroup(ON_LINE) .gmtCreate(date) .gmtModified(date) .build(); Integer lock = heraLockService.insert(heraLock); if (lock == null || lock <= 0) { return; } } if (WorkContext.host.equals(heraLock.getHost().trim())) { heraLock.setServerUpdate(new Date()); heraLockService.update(heraLock); HeraLog.info("hold lock and update time"); heraSchedule.startup(); } else { long currentTime = System.currentTimeMillis(); long lockTime = heraLock.getServerUpdate().getTime(); long interval = currentTime - lockTime; if (interval > timeout && isPreemptionHost()) { Date date = new Date(); Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost()); if (lock != null && lock > 0) { ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host); heraSchedule.startup(); heraLock.setHost(WorkContext.host); //TODO 接入master切换通知 } else { HeraLog.info("master抢占失败,由其它worker抢占成功"); } } else { //非主节点,调度器不执行 heraSchedule.shutdown(); } } workClient.init(); try { workClient.connect(heraLock.getHost().trim()); } catch (Exception e) { e.printStackTrace(); } } /** * 检测该ip是否具有抢占master的权限 * @return 是/否 */ private boolean isPreemptionHost() { List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnvironment.preemptionMasterGroup); if (preemptionHostList.contains(WorkContext.host)) { return true; } else { HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString()); return false; } } }
1.通过PostConstruct在项目启动的时候触发定时任务。。
2.定时任务主要通过数库来进行强占master
3.如果是master,定时任务做的事情就是update heraLock ServerUpdate
4.如果是worker,定时任务做的事情就是查看master是不是长久没更新ServerUpdate,然后强占
5. heraSchedule.startup();workClient.init();workClient.connect(heraLock.getHost().trim()) 这三方法里面有状态,会置空操作。
2.masterServer与workClient启动及交互
masterServer的启动入口为,heraSchedule.startup()-->masterxContext.init()
public void init() { threadPool = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy()); masterSchedule = new ScheduledThreadPoolExecutor(5, new NamedThreadFactory("master-schedule", false)); masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES); masterSchedule.allowCoreThreadTimeOut(true); this.getQuartzSchedulerService().start(); dispatcher = new Dispatcher(); handler = new MasterHandler(this);//主handler masterServer = new MasterServer(handler);//主通讯类 masterServer.start(HeraGlobalEnvironment.getConnectPort());//启动 master.init(this); HeraLog.info("end init master content success "); }
而MasterSever主要关注MasterHandler
public MasterHandler(MasterContext masterContext) { this.masterContext = masterContext; //把处理结果统一管理 completionService = new ExecutorCompletionService<>( new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-execute", false), new ThreadPoolExecutor.AbortPolicy())); ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("master-deal", false), new ThreadPoolExecutor.AbortPolicy()); executor.execute(() -> { Future<ChannelResponse> future; ChannelResponse response; while (true) {//一量处理有结果,即时响应 try { future = completionService.take(); response = future.get(); TaskLog.info("3-1.MasterHandler:-->master prepare send status : {}", response.webResponse.getStatus()); response.channel.writeAndFlush(wrapper(response.webResponse)); TaskLog.info("3-2.MasterHandler:-->master send response success, requestId={}", response.webResponse.getRid()); } catch (Exception e) { ErrorLog.error("master handler future take error:{}", e); e.printStackTrace(); } catch (Throwable throwable) { ErrorLog.error("master handler future take throwable{}", throwable); throwable.printStackTrace(); } } } ); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SocketMessage socketMessage = (SocketMessage) msg; Channel channel = ctx.channel(); switch (socketMessage.getKind()) { //心跳 case REQUEST://如果是内置任务无需处理结果 Request request = Request.newBuilder().mergeFrom(socketMessage.getBody()).build(); switch (request.getOperate()) { case HeartBeat: masterContext.getThreadPool().execute(() -> MasterHandleRequest.handleHeartBeat(masterContext, channel, request)); break; case SetWorkInfo: //此处省略代码。。 break; } break; case WEB_REQUEST://web任务统处理,结果由专门线程返回 final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build(); switch (webRequest.getOperate()) { case ExecuteJob: //此处省略代码。。。 break; case RESPONSE://对于客户端内置请求响应结果交给监听器 masterContext.getThreadPool().execute(() -> { Response response = null; try { response = Response.newBuilder().mergeFrom(socketMessage.getBody()).build(); SocketLog.info("6.MasterHandler:receiver socket info from work {}, response is {}", ctx.channel().remoteAddress(), response.getRid()); for (ResponseListener listener : listeners) { listener.onResponse(response); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }); break; case WEB_RESPONSE://对于客户端WEb请求响应结果交给监听器 masterContext.getThreadPool().execute(() -> { WebResponse webResponse = null; try { webResponse = WebResponse.newBuilder().mergeFrom(socketMessage.getBody()).build(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } SocketLog.info("6.MasterHandler:receiver socket info from work {}, webResponse is {}", ctx.channel().remoteAddress(), webResponse.getRid()); for (ResponseListener listener : listeners) { listener.onWebResponse(webResponse); } }); break; default: ErrorLog.error("unknown request type : {}", socketMessage.getKind()); break; } } //再看看监听器,任务里面有个ID号可以对应起结果 public class WorkResponseListener extends ResponseListenerAdapter { private RpcWebRequest.WebRequest request; private volatile Boolean receiveResult; private CountDownLatch latch; private RpcWebResponse.WebResponse webResponse; @Override public void onWebResponse(RpcWebResponse.WebResponse response) { if (request.getRid() == response.getRid()) { try { webResponse = response; receiveResult = true; } catch (Exception e) { ErrorLog.error("work release exception {}", e); } finally { latch.countDown(); } } } }
workclient的设计方式与masterServer设计方式如出一辙,具体查看WorkHandler类
- 把有需要结果的语法统一交给completionService,统一返回
- 针对响应结果采用监听器的方式,通知相关业务
3.支持master/work 负载,内存,进程,cpu信息的可视化查看
//workClient中init方法,定时上报workerHandlerHeartBeat workSchedule.schedule(new Runnable() { private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat(); private int failCount = 0; @Override public void run() { try { if (workContext.getServerChannel() != null) { //这是主方法 boolean send = workerHandlerHeartBeat.send(workContext); if (!send) { failCount++; ErrorLog.error("send heart beat failed ,failCount :" + failCount); } else { failCount = 0; HeartLog.info("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress()); } } else { ErrorLog.error("server channel can not find on " + WorkContext.host); } } catch (Exception e) { ErrorLog.error("heart beat error:", e); } finally { workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS); } } }, HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS); //主要send方法 public boolean send(WorkContext context) { try { MemUseRateJob memUseRateJob = new MemUseRateJob(1); //内存信息获取 memUseRateJob.readMemUsed(); CpuLoadPerCoreJob loadPerCoreJob = new CpuLoadPerCoreJob(); //负载信息获取 loadPerCoreJob.run(); //构建包体 RpcHeartBeatMessage.HeartBeatMessage hbm = RpcHeartBeatMessage.HeartBeatMessage.newBuilder() .setHost(WorkContext.host) .setMemTotal(memUseRateJob.getMemTotal()) .setMemRate(memUseRateJob.getRate()) .setCpuLoadPerCore(loadPerCoreJob.getLoadPerCore()) .setTimestamp(System.currentTimeMillis()) .addAllDebugRunnings(context.getDebugRunning().keySet()) .addAllManualRunnings(context.getManualRunning().keySet()) .addAllRunnings(context.getRunning().keySet()) .setCores(WorkContext.cpuCoreNum) .build(); context.getServerChannel().writeAndFlush(RpcSocketMessage.SocketMessage.newBuilder(). setKind(RpcSocketMessage.SocketMessage.Kind.REQUEST). setBody(RpcRequest.Request.newBuilder(). setRid(AtomicIncrease.getAndIncrement()). setOperate(RpcOperate.Operate.HeartBeat). setBody(hbm.toByteString()). build().toByteString()). build()); } catch (RemotingException e) { e.printStackTrace(); return false; } return true; } //master对于heartBeat的处理方式 public static void handleHeartBeat(MasterContext masterContext, Channel channel, Request request) { //放到master服务器的内存中,等待WEB应用的调用 MasterWorkHolder workHolder = masterContext.getWorkMap().get(channel); HeartBeatInfo heartBeatInfo = new HeartBeatInfo(); HeartBeatMessage heartBeatMessage; try { heartBeatMessage = HeartBeatMessage.newBuilder().mergeFrom(request.getBody()).build(); heartBeatInfo.setHost(heartBeatMessage.getHost()); heartBeatInfo.setMemRate(heartBeatMessage.getMemRate()); heartBeatInfo.setMemTotal(heartBeatMessage.getMemTotal()); heartBeatInfo.setCpuLoadPerCore(heartBeatMessage.getCpuLoadPerCore()); heartBeatInfo.setRunning(heartBeatMessage.getRunningsList()); heartBeatInfo.setDebugRunning(heartBeatMessage.getDebugRunningsList()); heartBeatInfo.setManualRunning(heartBeatMessage.getManualRunningsList()); heartBeatInfo.setTimestamp(heartBeatMessage.getTimestamp()); heartBeatInfo.setCores(heartBeatMessage.getCores()); workHolder.setHeartBeatInfo(heartBeatInfo); HeartLog.info("received heart beat from {} : {}", heartBeatMessage.getHost(), JSONObject.toJSONString(heartBeatInfo)); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
1.work启动时,启动定时任务,专们负责收集数据,并发送给master
2.master,收到信息存在自己内存中,等待web调用并返回
4.任务添加与执行过程
//研发中心立即执行 @RequestMapping(value = "/debugSelectCode", method = RequestMethod.POST) @ResponseBody public WebAsyncTask<JsonResponse> debugSelectCode(@RequestBody HeraFile heraFile) { String owner = getOwner(); return new WebAsyncTask<JsonResponse>(HeraGlobalEnvironment.getRequestTimeout(), () -> { Map<String, Object> res = new HashMap<>(2); HeraFile file = heraFileService.findById(heraFile.getId()); file.setContent(heraFile.getContent()); String name = file.getName(); String runType; HeraDebugHistory history = HeraDebugHistory.builder() .fileId(file.getId()) .script(heraFile.getContent()) .startTime(new Date()) .owner(owner) .hostGroupId(file.getHostGroupId() == 0 ? HeraGlobalEnvironment.defaultWorkerGroup : file.getHostGroupId()) .build(); int suffixIndex = name.lastIndexOf(Constants.POINT); if (suffixIndex == -1) { return new JsonResponse(false, "无后缀名,请设置支持的后缀名[.sh .hive .spark]"); } String suffix = name.substring(suffixIndex); //只支持shell,hive,spark if ((Constants.HIVE_SUFFIX).equalsIgnoreCase(suffix)) { runType = Constants.HIVE_FILE; } else if ((Constants.SHELL_SUFFIX).equalsIgnoreCase(suffix)) { runType = Constants.SHELL_FILE; } else if ((Constants.SPARK_SUFFIX).equalsIgnoreCase(suffix)) { runType = Constants.SPARK_FILE; } else { return new JsonResponse(false, "暂未支持的后缀名[" + suffix + "],请设置支持的后缀名[.sh .hive .spark]"); } history.setRunType(runType); String newId = debugHistoryService.insert(history); //向master提交任务 workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.DebugKind, newId); res.put("fileId", file.getId()); res.put("debugId", newId); return new JsonResponse(true, "执行成功", res); }); } //master端对此任务的相关处理,则是加入DebugQueue,进入等待队列 public void debug(HeraDebugHistoryVo debugHistory) { JobElement element = JobElement.builder() .jobId(debugHistory.getId()) .hostGroupId(debugHistory.getHostGroupId()) .build(); debugHistory.setStatus(StatusEnum.RUNNING); debugHistory.setStartTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); debugHistory.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 进入任务队列"); masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(debugHistory)); try { masterContext.getDebugQueue().put(element); } catch (InterruptedException e) { ErrorLog.error("添加开发中心执行任务失败:" + element.getJobId(), e); } } /** * 在master.init的时候会,会启动定时任务 * 扫描任务等待队列,取出任务去执行 * 对于没有可运行机器的时,manual,debug任务重新offer到原队列 */ public boolean scan() throws InterruptedException { boolean hasTask = false; if (!masterContext.getScheduleQueue().isEmpty()) { 代码省略 } if (!masterContext.getManualQueue().isEmpty()) { //代码省略 } if (!masterContext.getDebugQueue().isEmpty()) { //拿出任务 JobElement jobElement = masterContext.getDebugQueue().take(); MasterWorkHolder selectWork = getRunnableWork(jobElement); if (selectWork == null) { masterContext.getDebugQueue().put(jobElement); ScheduleLog.warn("can not get work to execute DebugQueue job in master,job is:{}", jobElement.toString()); } else { //当有机器的时候,直接给机子分配任务(向work发送任务) runDebugJob(selectWork, jobElement.getJobId()); hasTask = true; } } return hasTask; } /** * worker接收到命令,JobUtils.createDebugJob创建job文件到服务器,拼接shell,并调用命令执行 */ private Future<RpcResponse.Response> debug(WorkContext workContext, RpcRequest.Request request) { RpcDebugMessage.DebugMessage debugMessage = null; try { debugMessage = RpcDebugMessage.DebugMessage.newBuilder().mergeFrom(request.getBody()).build(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } String debugId = debugMessage.getDebugId(); HeraDebugHistoryVo history = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId)); return workContext.getWorkExecuteThreadPool().submit(() -> { int exitCode = -1; Exception exception = null; ResponseStatus.Status status; try { history.setExecuteHost(WorkContext.host); workContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history)); String date = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File directory = new File(HeraGlobalEnvironment.getWorkDir() + File.separator + date + File.separator + "debug-" + debugId); if (!directory.exists()) { if (directory.mkdirs()) { HeraLog.error("创建文件失败:" + directory.getAbsolutePath()); } } Job job = JobUtils.createDebugJob(new JobContext(JobContext.DEBUG_RUN), BeanConvertUtils.convert(history), directory.getAbsolutePath(), workContext); workContext.getDebugRunning().putIfAbsent(debugId, job); exitCode = job.run(); } catch (Exception e) { exception = e; history.getLog().appendHeraException(e); } finally { HeraDebugHistoryVo heraDebugHistoryVo = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId)); heraDebugHistoryVo.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); StatusEnum statusEnum = getStatusFromCode(exitCode); if (exitCode == 0) { status = ResponseStatus.Status.OK; heraDebugHistoryVo.setStatus(statusEnum); } else { status = ResponseStatus.Status.ERROR; heraDebugHistoryVo.setStatus(statusEnum); } workContext.getHeraDebugHistoryService().updateStatus(BeanConvertUtils.convert(heraDebugHistoryVo)); HeraDebugHistoryVo debugHistory = workContext.getDebugRunning().get(debugId).getJobContext().getDebugHistory(); workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(debugHistory)); workContext.getDebugRunning().remove(debugId); } String errorText = ""; if (exception != null && exception.getMessage() != null) { errorText = exception.getMessage(); } return RpcResponse.Response.newBuilder() .setRid(request.getRid()) .setOperate(RpcOperate.Operate.Debug) .setStatusEnum(status) .setErrorText(errorText) .build(); }); }
- web把任务请求统一提交到master,master加到队列中
- master的后台进程去队列中拿任务
- 用scan选择work
- work接收到任务,执行对应脚本job.
5.服务降级,负载均衡
/** * 获取hostGroupId中可以分发任务的worker */ private MasterWorkHolder getRunnableWork(JobElement jobElement) { //根据算法选择对应的work MasterWorkHolder selectWork = loadBalance.select(jobElement, masterContext); if (selectWork == null) { return null; } Channel channel = selectWork.getChannel().getChannel(); HeartBeatInfo beatInfo = selectWork.getHeartBeatInfo(); // 如果最近两次选择的work一致 需要等待机器最新状态发来之后(睡眠)再进行任务分发 if (HeraGlobalEnvironment.getWarmUpCheck() > 0 && lastWork != null && channel == lastWork && (beatInfo.getCpuLoadPerCore() > 0.6F || beatInfo.getMemRate() > 0.7F)) { ScheduleLog.info("达到预热条件,睡眠" + HeraGlobalEnvironment.getWarmUpCheck() + "秒"); try { TimeUnit.SECONDS.sleep(HeraGlobalEnvironment.getWarmUpCheck()); } catch (InterruptedException e) { e.printStackTrace(); } lastWork = null; return null; } lastWork = channel; return selectWork; }
- 先去数据库(缓存)查询任务对应分组机子
- 与在线机子做差集
- 在master为任务选择work的时候,项目分别采用了轮询及随机算法来实现
6.支持日志的实时滚动
/** * 定时 刷新日志到数据库 */ workSchedule.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { for (Job job : workContext.getRunning().values()) { //处理日志 } for (Job job : workContext.getManualRunning().values()) { //处理日志 } for (Job job : workContext.getDebugRunning().values()) { try { //服务在运行中会向这类不断导入日志 HeraDebugHistoryVo history = job.getJobContext().getDebugHistory(); workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(history)); } catch (Exception e) { printDebugLog(job, e); } } } catch (Exception e) { ErrorLog.error("job log flush exception:{}", e.toString()); } } }, 0, 5, TimeUnit.SECONDS); }
1.脚本在运行中的时候,work都会为每个脚本维护一个HeraJobHistoryVo,其中 LogContent log属性会和服务输入输出流对应
2.work启动的时候,会启动一个定时刷新日志在数据库的进程
3.web端通个查询数据库很好的达到时实
7.hera事件模型dispatch
/** * 事件广播,每次任务状态变化,触发响应事件,全局广播,自动调度successEvent,触发依赖调度一些依赖更新 * * @param applicationEvent */ public void dispatch(ApplicationEvent applicationEvent) { try { //封装事件 MvcEvent mvcEvent = new MvcEvent(this, applicationEvent); mvcEvent.setApplicationEvent(applicationEvent); //前置监听者 if (fireEvent(beforeDispatch, mvcEvent)) { List<AbstractHandler> jobHandlersCopy = Lists.newArrayList(jobHandlers); for (AbstractHandler jobHandler : jobHandlersCopy) { //是否可执行 if (jobHandler.canHandle(applicationEvent)) { if (!jobHandler.isInitialized()) { jobHandler.setInitialized(true); } //job触发 jobHandler.handleEvent(applicationEvent); } } //后置监听者 fireEvent(afterDispatch, mvcEvent); } } catch (Exception e) { ErrorLog.error("global dispatch job event error", e); } }
1.dispatch对于事件的主要处理顺序 前置监听 ---- jobHandler ----- 后置监听
8.支持关注自己的任务,自动调度执行失败时会向负责人发送邮件
private void runDebugJob(MasterWorkHolder selectWork, String debugId) { final MasterWorkHolder workHolder = selectWork; this.executeJobPool.execute(() -> { HeraDebugHistoryVo history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId)); history.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 开始运行"); masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history)); Exception exception = null; RpcResponse.Response response = null; Future<RpcResponse.Response> future = null; try { future = new MasterExecuteJob().executeJob(masterContext, workHolder, JobExecuteKind.ExecuteKind.DebugKind, debugId); response = future.get(HeraGlobalEnvironment.getTaskTimeout(), TimeUnit.HOURS); } catch (Exception e) { exception = e; if (future != null) { future.cancel(true); } DebugLog.error(String.format("debugId:%s run failed", debugId), e); } boolean success = response != null && response.getStatusEnum() == ResponseStatus.Status.OK; if (!success) { exception = new HeraException(String.format("fileId:%s run failed ", history.getFileId()), exception); TaskLog.info("8.Master: debug job error"); history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId)); //创建失败事件 HeraDebugFailEvent failEvent = HeraDebugFailEvent.builder() .debugHistory(BeanConvertUtils.convert(history)) .throwable(exception) .fileId(history.getFileId()) .build(); //将失败事件通知关心的人 masterContext.getDispatcher().forwardEvent(failEvent); } else { TaskLog.info("7.Master: debug success"); //创建成功事件 HeraDebugSuccessEvent successEvent = HeraDebugSuccessEvent.builder() .fileId(history.getFileId()) .history(BeanConvertUtils.convert(history)) .build(); //将成功事件通知关心的人 masterContext.getDispatcher().forwardEvent(successEvent); } }); } public void alarm(HeraJobFailedEvent failedEvent) { String actionId = failedEvent.getActionId(); Integer jobId = ActionUtil.getJobId(actionId); if (jobId == null) { return; } HeraJob heraJob = heraJobService.findById(jobId); // 自己建立的任务运行失败必须收到告警 if (heraJob.getAuto() != 1 && !Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) { return; } StringBuilder address = new StringBuilder(); HeraUser user = heraUserService.findByName(heraJob.getOwner()); address.append(user.getEmail().trim()).append(Constants.SEMICOLON); try { HeraJobMonitor monitor = heraJobMonitorService.findByJobIdWithOutBlank(heraJob.getId()); if (monitor == null && Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) { ScheduleLog.info("任务无监控人,发送给owner:{}", heraJob.getId()); } else if (monitor != null) { String ids = monitor.getUserIds(); String[] id = ids.split(Constants.COMMA); for (String anId : id) { if (StringUtils.isBlank(anId)) { continue; } HeraUser monitor_user = heraUserService.findById(Integer.parseInt(anId)); if (monitor_user != null && monitor_user.getEmail() != null) { address.append(monitor_user.getEmail().trim()).append(Constants.SEMICOLON); } } } String title = "hera调度任务失败[任务=" + heraJob.getName() + "(" + heraJob.getId() + "),版本号=" + actionId + "]"; String content = "任务ID:" + heraJob.getId() + Constants.HTML_NEW_LINE + "任务名:" + heraJob.getName() + Constants.HTML_NEW_LINE + "任务版本号:" + actionId + Constants.HTML_NEW_LINE + "任务描述:" + heraJob.getDescription() + Constants.HTML_NEW_LINE + "任务OWNER:" + heraJob.getOwner() + Constants.HTML_NEW_LINE; String errorMsg = failedEvent.getHeraJobHistory().getLog().getMailContent(); if (errorMsg != null) { content += Constants.HTML_NEW_LINE + Constants.HTML_NEW_LINE + "--------------------------------------------" + Constants.HTML_NEW_LINE + errorMsg; } emailService.sendEmail(title, content, address.toString()); } catch (MessagingException e) { e.printStackTrace(); ErrorLog.error("发送邮件失败"); }
1.在master.init中 有 masterContext.getDispatcher().addDispatcherListener(new HeraJobFailListener());这一代码。
他关注job失败的事件,当master端收的失败响应,则会触发事件
2.任务失败事件的最终响应者是 EmailJobFailAlarm,发邮件通知责任人
9.生成版本
//isSingle是否全量生成版本 //jobId 如果是单个,其任务ID private boolean generateAction(boolean isSingle, Integer jobId) { try { //防止全量重复调用, if (isGenerateActioning) { return true; } DateTime dateTime = new DateTime(); Date now = dateTime.toDate(); int executeHour = dateTime.getHourOfDay(); //凌晨生成版本,早上七点以后开始再次生成版本 boolean execute = executeHour == 0 || (executeHour > ActionUtil.ACTION_CREATE_MIN_HOUR && executeHour <= ActionUtil.ACTION_CREATE_MAX_HOUR); if (execute || isSingle) { String currString = ActionUtil.getCurrHourVersion(); if (executeHour == ActionUtil.ACTION_CREATE_MAX_HOUR) { Tuple<String, Date> nextDayString = ActionUtil.getNextDayString(); //例如:今天 2018.07.17 23:50 currString = 201807180000000000 now = 2018.07.18 23:50 currString = nextDayString.getSource(); now = nextDayString.getTarget(); } Long nowAction = Long.parseLong(currString); ConcurrentHashMap<Long, HeraAction> actionMap = new ConcurrentHashMap<>(heraActionMap.size()); List<HeraJob> jobList = new ArrayList<>(); //批量生成 if (!isSingle) { isGenerateActioning = true; jobList = masterContext.getHeraJobService().getAll(); } else { //单个任务生成版本 HeraJob heraJob = masterContext.getHeraJobService().findById(jobId); jobList.add(heraJob); actionMap = heraActionMap; List<Long> shouldRemove = new ArrayList<>(); for (Long actionId : actionMap.keySet()) { if (StringUtil.actionIdToJobId(String.valueOf(actionId), String.valueOf(jobId))) { shouldRemove.add(actionId); } } //移除内存所有依赖这个Job的老版本 shouldRemove.forEach(actionMap::remove); List<AbstractHandler> handlers = new ArrayList<>(masterContext.getDispatcher().getJobHandlers()); 移除内存所有依赖这个Job的老版本任务 if (handlers != null && handlers.size() > 0) { for (AbstractHandler handler : handlers) { JobHandler jobHandler = (JobHandler) handler; if (StringUtil.actionIdToJobId(jobHandler.getActionId(), String.valueOf(jobId))) { masterContext.getQuartzSchedulerService().deleteJob(jobHandler.getActionId()); masterContext.getDispatcher().removeJobHandler(jobHandler); } } } } String cronDate = ActionUtil.getActionVersionPrefix(now); Map<Integer, List<HeraAction>> idMap = new HashMap<>(jobList.size()); Map<Integer, HeraJob> jobMap = new HashMap<>(jobList.size()); //生成新的版本,并且注入actionMap generateScheduleJobAction(jobList, cronDate, actionMap, nowAction, idMap, jobMap); //整理新的依赖关系 for (Map.Entry<Integer, HeraJob> entry : jobMap.entrySet()) { generateDependJobAction(jobMap, entry.getValue(), actionMap, nowAction, idMap); } if (executeHour < ActionUtil.ACTION_CREATE_MAX_HOUR) { heraActionMap = actionMap; } Dispatcher dispatcher = masterContext.getDispatcher(); if (dispatcher != null) { if (actionMap.size() > 0) { for (Long id : actionMap.keySet()) { dispatcher.addJobHandler(new JobHandler(id.toString(), masterContext.getMaster(), masterContext)); //新加入的版本发送事件通知 if (id >= Long.parseLong(currString)) { dispatcher.forwardEvent(new HeraJobMaintenanceEvent(Events.UpdateActions, id.toString())); } } } } ScheduleLog.info("[单个任务:{},任务id:{}]generate action success", isSingle, jobId); return true; } } catch (Exception e) { e.printStackTrace(); } finally { isGenerateActioning = false; } return false; }
- 把要生成版本号的JOB,从内存及调度中删除
- 生成新版本号JOB,整理依赖关系,加入内存到缓存中
- 把新版本的生成事件播放出去
9.支持任务的定时调度、依赖调度、手动调度、手动恢复
定时调度
主要是根据cron表达式来解析该任务的执行时间,在达到触发时间时将该任务加入任务队列
/** * 自动任务的版本生成 * * @param jobList 任务集合 * @param cronDate 日期 * @param actionMap actionMap集合 * @param nowAction 生成版本时间的action * @param idMap 已经遍历过的idMap * @param jobMap 依赖任务map映射 */ public void generateScheduleJobAction(List<HeraJob> jobList, String cronDate, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap, Map<Integer, HeraJob> jobMap) { List<HeraAction> insertActionList = new ArrayList<>(); for (HeraJob heraJob : jobList) { if (heraJob.getScheduleType() != null) { if (heraJob.getScheduleType() == 1) { jobMap.put(heraJob.getId(), heraJob); } else if (heraJob.getScheduleType() == 0) { //如果是定时任务,会解析表达式,拆分出多个版本 String cron = heraJob.getCronExpression(); List<String> list = new ArrayList<>(); if (StringUtils.isNotBlank(cron)) { boolean isCronExp = CronParse.Parser(cron, cronDate, list); if (!isCronExp) { ErrorLog.error("cron parse error,jobId={},cron = {}", heraJob.getId(), cron); continue; } List<HeraAction> heraAction = createHeraAction(list, heraJob); idMap.put(heraJob.getId(), heraAction); insertActionList.addAll(heraAction); } } else { ErrorLog.error("任务{}未知的调度类型{}", heraJob.getId(), heraJob.getScheduleType()); } } } batchInsertList(insertActionList, actionMap, nowAction); } //jobHandler中的Events.UpdateActions事件处理 private void autoRecovery() { cache.refresh(); HeraActionVo heraActionVo = cache.getHeraActionVo(); //任务被删除 if (heraActionVo == null) { masterContext.getDispatcher().removeJobHandler(this); destroy(); ScheduleLog.info("heraAction 为空, 删除{}", actionId); return; } //自动调度关闭 if (!heraActionVo.getAuto()) { destroy(); return; } /** * 如果是依赖任务 原来可能是独立任务,需要尝试删除原来的定时调度 * 如果是独立任务,则重新创建quartz调度 * */ if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Dependent) { destroy(); } else if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) { try { createScheduleJob(masterContext.getDispatcher(), heraActionVo); } catch (SchedulerException e) { e.printStackTrace(); } } } /** * 创建定时任务 * * @param dispatcher the scheduler * @param heraActionVo the job name */ private void createScheduleJob(Dispatcher dispatcher, HeraActionVo heraActionVo) throws SchedulerException { if (!ActionUtil.isCurrActionVersion(actionId)) { return; } JobKey jobKey = new JobKey(actionId, Constants.HERA_GROUP); if (masterContext.getQuartzSchedulerService().getScheduler().getJobDetail(jobKey) == null) { JobDetail jobDetail = JobBuilder.newJob(HeraQuartzJob.class).withIdentity(jobKey).build(); jobDetail.getJobDataMap().put("actionId", heraActionVo.getId()); jobDetail.getJobDataMap().put("dispatcher", dispatcher); //TODO 根据任务区域加时区 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(heraActionVo.getCronExpression().trim())/*.inTimeZone()*/; CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(actionId, Constants.HERA_GROUP).withSchedule(scheduleBuilder).build(); masterContext.getQuartzSchedulerService().getScheduler().scheduleJob(jobDetail, trigger); ScheduleLog.info("--------------------------- 添加自动调度成功:{}--------------------------", heraActionVo.getId()); } }
- 在生成版本号的时候根据cron表达式生成多个版本
- 发出多版本变更通知
- jobHandler处理给对应新版本创建对应定时任务
依赖调度
我们的任务大部分都有依赖关系,只有在上一个任务计算出结果后才能进行下一步的执行。我们的依赖任务会在所有的依赖任务都执行完成之后才会被触发加入任务队列
/** * 递归生成任务依赖action * * @param jobMap 任务映射map * @param heraJob 当前生成版本的任务 * @param actionMap 版本map * @param nowAction 生成版本时间的action * @param idMap job的id集合 只要已经检测过的id都放入idSet中 */ private void generateDependJobAction(Map<Integer, HeraJob> jobMap, HeraJob heraJob, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap) { if (heraJob == null || idMap.containsKey(heraJob.getId())) { return; } String jobDependencies = heraJob.getDependencies(); if (StringUtils.isNotBlank(jobDependencies)) { Map<String, List<HeraAction>> dependenciesMap = new HashMap<>(1024); String[] dependencies = jobDependencies.split(Constants.COMMA); String actionMinDeps = ""; boolean noAction = false; for (String dependentId : dependencies) { Integer dpId = Integer.parseInt(dependentId); //如果idSet不包含依赖任务dpId 则递归查找 if (!idMap.containsKey(dpId)) { generateDependJobAction(jobMap, jobMap.get(dpId), actionMap, nowAction, idMap); } List<HeraAction> dpActions = idMap.get(dpId); dependenciesMap.put(dependentId, dpActions); if (dpActions == null || dpActions.size() == 0) { ErrorLog.warn("{}今天找不到版本,无法为任务{}生成版本", dependentId, heraJob.getId()); noAction = true; break; } if (StringUtils.isBlank(actionMinDeps)) { actionMinDeps = dependentId; } //找到所依赖的任务中版本最少的作为基准版本。 if (dependenciesMap.get(actionMinDeps).size() > dependenciesMap.get(dependentId).size()) { actionMinDeps = dependentId; } else if (dependenciesMap.get(dependentId).size() > 0 && dependenciesMap.get(actionMinDeps).size() == dependenciesMap.get(dependentId).size() && dependenciesMap.get(actionMinDeps).get(0).getId() < dependenciesMap.get(dependentId).get(0).getId()) { //如果两个版本的个数一样 那么应该找一个时间较大的 actionMinDeps = dependentId; } } if (noAction) { idMap.put(heraJob.getId(), null); } else { List<HeraAction> actionMinList = dependenciesMap.get(actionMinDeps); if (actionMinList != null && actionMinList.size() > 0) { List<HeraAction> insertList = new ArrayList<>(); for (HeraAction action : actionMinList) { StringBuilder actionDependencies = new StringBuilder(action.getId().toString()); Long longActionId = Long.parseLong(actionDependencies.toString()); for (String dependency : dependencies) { if (!dependency.equals(actionMinDeps)) { List<HeraAction> otherAction = dependenciesMap.get(dependency); if (otherAction == null || otherAction.size() == 0) { continue; } //找到一个离基准版本时间最近的action,添加为该任务的依赖 String otherActionId = otherAction.get(0).getId().toString(); for (HeraAction o : otherAction) { if (Math.abs(o.getId() - longActionId) < Math.abs(Long.parseLong(otherActionId) - longActionId)) { otherActionId = o.getId().toString(); } } actionDependencies.append(","); actionDependencies.append(Long.parseLong(otherActionId) / 1000000 * 1000000 + Long.parseLong(dependency)); } } HeraAction actionNew = new HeraAction(); BeanUtils.copyProperties(heraJob, actionNew); Long actionId = longActionId / 1000000 * 1000000 + Long.parseLong(String.valueOf(heraJob.getId())); actionNew.setId(actionId); actionNew.setGmtCreate(new Date()); actionNew.setDependencies(actionDependencies.toString()); actionNew.setJobDependencies(heraJob.getDependencies()); actionNew.setJobId(heraJob.getId()); actionNew.setAuto(heraJob.getAuto()); actionNew.setHostGroupId(heraJob.getHostGroupId()); masterContext.getHeraJobActionService().insert(actionNew, nowAction); actionMap.put(actionNew.getId(), actionNew); insertList.add(actionNew); } idMap.put(heraJob.getId(), insertList); } } } } /** * 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent * * @param event */ private void handleSuccessEvent(HeraJobSuccessEvent event) { if (event.getTriggerType() == TriggerTypeEnum.MANUAL) { return; } String jobId = event.getJobId(); HeraActionVo heraActionVo = cache.getHeraActionVo(); if (heraActionVo == null) { autoRecovery(); return; } if (!heraActionVo.getAuto()) { return; } if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) { return; } if (heraActionVo.getDependencies() == null || !heraActionVo.getDependencies().contains(jobId)) { return; } JobStatus jobStatus; //必须同步 synchronized (this) { jobStatus = heraJobActionService.findJobStatus(actionId); ScheduleLog.info(actionId + "received a success dependency job with actionId = " + jobId); jobStatus.getReadyDependency().put(jobId, String.valueOf(System.currentTimeMillis())); heraJobActionService.updateStatus(jobStatus); } boolean allComplete = true; for (String key : heraActionVo.getDependencies()) { if (jobStatus.getReadyDependency().get(key) == null) { allComplete = false; break; } } if (allComplete) { ScheduleLog.info("JobId:" + actionId + " all dependency jobs is ready,run!"); startNewJob(event.getTriggerType(), heraActionVo); } else { ScheduleLog.info(actionId + "some of dependency is not ready, waiting" + JSONObject.toJSONString(jobStatus.getReadyDependency().keySet())); } }
-
generateDependJobAction算法比较复杂,其目的就是刷新下内存中 heraActionMap 中的依赖关系
- 当其依赖的独立任务完成时,jobHandler会收到消息,然后较验自己的所有依赖是否就绪然后发起任务命令
手动调度与手动恢复
手动调度即为手动执行的任务,手动执行后自动加入任务队列,请注意,手动任务执行成功后不会通知下游任务(即:依赖于该任务的任务)该任务已经执行完成。
手动恢复类似于手动调度,于手动调度的区别为此时如果该任务执行成功,会通知下游任务该任务已经执行完成
/** * 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent * * @param event */ private void handleSuccessEvent(HeraJobSuccessEvent event) { if (event.getTriggerType() == TriggerTypeEnum.MANUAL) { return; } //省略代码 }
1.手动执行和debug执行流程相似,不做多介绍
2.Jobhandler中handleSuccessEvent对手动的进行了过滤
总结:
关于抢占模式:
基本上所有的调度工作都集中在master,通过权限控制可以很好的做个主备master,但备master会处于空闲。。
优化点:
master 处于单机运行模式,是否能提供集群方式运行
如果集群关于work与master长链接维护的问题:
如果不是和dubbo一样每个master和work建立长链接,就为有调用不了的问题。。
如果work过多,对于master也是一种压力。。
master不建议设成work,是否可以改成 独占为master的work,不在接受工作调度
关于机器选用:
先去数据库(缓存)查询任务对应分组机子,与在线机子做差集。然后平均负载。因为缓存的关系,新加一台work要等待半小时。
优化点:
作为work是不是应该自己所在的分组。。为什么不在启动的时候或者web设置的时候告诉work是属于什么分组的
关于NIO通讯模型的使用
hera的设计于master与work的通讯基本上采用 master发出请求,然后等待,work处理返回, master做后置处理,调用事件模型dispatch,做通知。
优化点:
nio的特点是多路复用,netty框架的整个设计也是通知模式的。为什么采用同步等待+事件模型的方式。。
-
从HERA的包容性ep散射限制的有效夸克半径
2020-03-21 13:15:26高精度的HERA数据可以搜索到TeV尺度,从而超出标准模型对电子夸克散射的贡献。 在此分析中,结合了对中性和带电电流ep散射中包含的深部非弹性截面的组合测量,其对应的光度约为1 fbb-1。 提出了一种超越标准模型对... -
hera源码剖析:一次任务触发的执行流程
2020-09-30 17:30:02在 hera 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 Master#run 方法(开发中心任务触发...在
hera
中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在Master#run
方法(开发中心任务触发接口在Master#debug
)。这里就讲一下手动执行的任务触发流程
触发任务
在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过
hera
操作文档查看,这里以手动恢复为例
当我们点击执行之后,会发送一个请求到后端work端
首先看下
work
端的堆栈信息writeAndFlush:28, NettyChannel (com.dfire.core.netty) writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster) buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request) handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request) executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker) execute:409, ScheduleOperatorController (com.dfire.controller) invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller) invoke:204, MethodProxy (org.springframework.cglib.proxy) invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework) proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework) proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj) around:72, HeraAspect (com.dfire.config) //省略部分
通过堆栈信息我们可以看到,在
controller
方法被调用之前会先调用一个通过AOP
实现的权限校验的方法HeraAspect#around
,当权限校验通过后会继续调用ScheduleOperatorController#execute
方法,该方法就是任务执行的入口。再往后就是调用WorkerHandleWebRequest#handleWebExecute
和WorkerHandleWebRequest#buildMessage
方法来创建netty
消息体,最后通过一个快速失败的容错方式(FailFastCluster#writeAndFlush
)来向master
发送一条netty
消息
下面仔细分析下,
controller
入口@RequestMapping(value = "/manual", method = RequestMethod.GET) @ResponseBody @ApiOperation("手动执行接口") public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId , @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType, @RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException { //省略部分校验代码 String configs = heraJob.getConfigs(); //新建hera_action_history 对象,并向mysql插入执行记录 HeraJobHistory actionHistory = HeraJobHistory.builder().build(); actionHistory.setJobId(heraAction.getJobId()); actionHistory.setActionId(heraAction.getId()); actionHistory.setTriggerType(triggerTypeEnum.getId()); actionHistory.setOperator(heraJob.getOwner()); actionHistory.setIllustrate(execUser); actionHistory.setStatus(StatusEnum.RUNNING.toString()); actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime()); actionHistory.setHostGroupId(heraAction.getHostGroupId()); heraJobHistoryService.insert(actionHistory); heraAction.setScript(heraJob.getScript()); heraAction.setHistoryId(actionHistory.getId()); heraAction.setConfigs(configs); heraAction.setAuto(heraJob.getAuto()); heraAction.setHostGroupId(heraJob.getHostGroupId()); heraJobActionService.update(heraAction); //向master 发送任务执行的请求 workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId()); String ownerId = getOwnerId(); if (ownerId == null) { ownerId = "0"; } //添加操作记录 addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId); return new JsonResponse(true, String.valueOf(actionId)); }
这部分的代码很简单,主要分为三部分
1.创建hera_action_history
对象,向mysql
插入任务的执行记录
2.通过netty
向master
发送任务执行的消息
3.添加任务执行记录需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看
public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException { RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS); if (response.getStatus() == ResponseStatus.Status.ERROR) { ErrorLog.error(response.getErrorText()); } }
这部分代码调用了
WorkerHandleWebRequest.handleWebExecute
并返回一个future
,通过future.get
来阻塞我们的请求,继续往下看public static Future<WebResponse> handleWebExecute(final WorkContext workContext, ExecuteKind kind, Long id) { return buildMessage(WebRequest.newBuilder() .setRid(AtomicIncrease.getAndIncrement()) .setOperate(WebOperate.ExecuteJob) .setEk(kind) .setId(String.valueOf(id)) .build(), workContext, "[执行]-任务超出" + HeraGlobalEnv.getRequestTimeout() + "秒未得到master消息返回:" + id); } private static Future<WebResponse> buildMessage(WebRequest request, WorkContext workContext, String errorMsg) { CountDownLatch latch = new CountDownLatch(1); WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null); workContext.getHandler().addListener(responseListener); Future<WebResponse> future = workContext.getWorkWebThreadPool().submit(() -> { latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS); if (!responseListener.getReceiveResult()) { ErrorLog.error(errorMsg); } workContext.getHandler().removeListener(responseListener); return responseListener.getWebResponse(); }); try { workContext.getServerChannel().writeAndFlush(SocketMessage.newBuilder() .setKind(SocketMessage.Kind.WEB_REQUEST) .setBody(request.toByteString()) .build()); SocketLog.info("1.WorkerHandleWebRequest: send web request to master requestId ={}", request.getRid()); } catch (RemotingException e) { workContext.getHandler().removeListener(responseListener); ErrorLog.error("1.WorkerHandleWebRequest: send web request to master exception requestId =" + request.getRid(), e); } return future; }
在
handleWebExecute
方法中,新建了一个 WebRequest 对象,需要注意的是该对象的operator
参数为WebOperate.ExecuteJob
,id
为hera_action_history
记录的id
。
然后在buildMessage
方法中有三个比较关键的代码
1.CountDownLatch latch = new CountDownLatch(1);
该锁会在一个线程池的异步操作中等待,并且会在WorkResponseListener
中被释放。
2.WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);
public class WorkResponseListener extends ResponseListenerAdapter { private RpcWebRequest.WebRequest request; private volatile Boolean receiveResult; private CountDownLatch latch; private RpcWebResponse.WebResponse webResponse; @Override public void onWebResponse(RpcWebResponse.WebResponse response) { if (request.getRid() == response.getRid()) { try { webResponse = response; receiveResult = true; } catch (Exception e) { ErrorLog.error("work release exception {}", e); } finally { latch.countDown(); } } } }
在
onWebResponse
方法中,当发现request.getRid() == response.getRid()
时会释放锁,并标志receiveResult
为true
3.调用workContext.getServerChannel().writeAndFlush
方法来向master发送任务执行的消息,在上篇hera源码剖析:项目启动之分布式锁 已经说过workContext
是什么时候设置的serverChannel
master端
master
接收所有netty
消息的处理类为MasterHandler
,也就是说上面work
发送的执行任务请求最终会在MasterHandler#channelRead
被调用@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SocketMessage socketMessage = (SocketMessage) msg; Channel channel = ctx.channel(); switch (socketMessage.getKind()) { //省略部分代码 case WEB_REQUEST: final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build(); switch (webRequest.getOperate()) { case ExecuteJob: completionService.submit(() -> new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest))); break; //省略部分代码 } //省略部分代码 } }
MasterHandler
直接把work
的任务执行请求异步分发给MasterHandlerWebResponse.handleWebExecute
来处理,并且返回了一个失败重试封装的channel
public static WebResponse handleWebExecute(MasterContext context, WebRequest request) { if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind) { Long historyId = Long.parseLong(request.getId()); HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId); HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory); context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId())); WebResponse webResponse = WebResponse.newBuilder() .setRid(request.getRid()) .setOperate(WebOperate.ExecuteJob) .setStatus(Status.OK) .build(); TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId = {} ", history.getJobId()); return webResponse; } else if (request.getEk() == ExecuteKind.DebugKind) { Long debugId = Long.parseLong(request.getId()); HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId); TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId); context.getMaster().debug(debugHistory); WebResponse webResponse = WebResponse.newBuilder() .setRid(request.getRid()) .setOperate(WebOperate.ExecuteJob) .setStatus(Status.OK) .build(); TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = {}", debugId); return webResponse; } return WebResponse.newBuilder() .setRid(request.getRid()) .setErrorText("未识别的操作类型" + request.getEk()) .setStatus(Status.ERROR) .build(); }
在这里主要是根据
request.getEk()
来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:ExecuteKind.ManualKind
,直接看if
部分代码。-
首先根据
hera_action_history
的id
来查询在work
端插入的那条记录 -
调用
master#run
方法 -
创建
webResponse
对象,返回执行任务ok
的标志
run方法
public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob) { Long actionId = heraJobHistory.getActionId(); //重复job检测 //1:检测任务是否已经在队列或者正在执行 if (checkJobExists(heraJobHistory, false)) { return; } HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId); Set<String> areaList = areaList(heraJob.getAreaId()); //2:非执行区域任务直接设置为成功 if (!areaList.contains(HeraGlobalEnv.getArea()) && !areaList.contains(Constants.ALL_AREA)) { ScheduleLog.info("非{}区域任务,直接设置为成功:{}", HeraGlobalEnv.getArea(), heraJob.getId()); heraAction.setLastResult(heraAction.getStatus()); heraAction.setStatus(StatusEnum.SUCCESS.toString()); heraAction.setHistoryId(heraJobHistory.getId()); heraAction.setReadyDependency("{}"); String host = "localhost"; heraAction.setHost(host); Date endTime = new Date(); heraAction.setStatisticStartTime(endTime); heraAction.setStatisticEndTime(endTime); masterContext.getHeraJobActionService().update(heraAction); heraJobHistory.getLog().append("非" + HeraGlobalEnv.getArea() + "区域任务,直接设置为成功"); heraJobHistory.setStatusEnum(StatusEnum.SUCCESS); heraJobHistory.setEndTime(endTime); heraJobHistory.setStartTime(endTime); heraJobHistory.setExecuteHost(host); masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory)); HeraJobSuccessEvent successEvent = new HeraJobSuccessEvent(actionId, heraJobHistory.getTriggerType(), heraJobHistory); masterContext.getDispatcher().forwardEvent(successEvent); return; } //3.先在数据库中set一些执行任务所需的必须值 然后再加入任务队列 heraAction.setLastResult(heraAction.getStatus()); heraAction.setStatus(StatusEnum.RUNNING.toString()); heraAction.setHistoryId(heraJobHistory.getId()); heraAction.setStatisticStartTime(new Date()); heraAction.setStatisticEndTime(null); masterContext.getHeraJobActionService().update(heraAction); heraJobHistory.getLog().append(ActionUtil.getTodayString() + " 进入任务队列"); masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory)); boolean isFixed; int priorityLevel = 3; Map<String, String> configs = StringUtil.convertStringToMap(heraAction.getConfigs()); String priorityLevelValue = configs.get("run.priority.level"); if (priorityLevelValue != null) { priorityLevel = Integer.parseInt(priorityLevelValue); } String areaFixed = HeraGlobalEnv.getArea() + Constants.POINT + Constants.HERA_EMR_FIXED; if (configs.containsKey(Constants.HERA_EMR_FIXED) || configs.containsKey(areaFixed)) { isFixed = Boolean.parseBoolean(configs.get(areaFixed)) || Boolean.parseBoolean(configs.get(Constants.HERA_EMR_FIXED)); } else { isFixed = Boolean.parseBoolean(getInheritVal(heraAction.getGroupId(), areaFixed, Constants.HERA_EMR_FIXED)); } Integer endMinute = masterContext.getHeraJobService().findMustEndMinute(heraAction.getJobId()); //4.组装JobElement JobElement element = JobElement.builder() .jobId(heraJobHistory.getActionId()) .hostGroupId(heraJobHistory.getHostGroupId()) .priorityLevel(priorityLevel) .historyId(heraJobHistory.getId()) .fixedEmr(isFixed) .owner(heraJob.getOwner()) .costMinute(endMinute) .build(); try { element.setTriggerType(heraJobHistory.getTriggerType()); HeraAction cacheAction = heraActionMap.get(element.getJobId()); if (cacheAction != null) { cacheAction.setStatus(StatusEnum.RUNNING.toString()); } //5.放入任务扫描队列 switch (heraJobHistory.getTriggerType()) { case MANUAL: masterContext.getManualQueue().put(element); break; case AUTO_RERUN: masterContext.getRerunQueue().put(element); break; case MANUAL_RECOVER: case SCHEDULE: masterContext.getScheduleQueue().put(element); break; case SUPER_RECOVER: masterContext.getSuperRecovery().put(element); break; default: ErrorLog.error("不支持的调度类型:{},id:{}", heraJobHistory.getTriggerType().toName(), heraJobHistory.getActionId()); break; } } catch (InterruptedException e) { ErrorLog.error("添加任务" + element.getJobId() + "失败", e); } }
run
方法的主要功能是将要执行的任务根据类型放到不同的队列。
由于代码较多分段分析checkJobExists
方法检测任务是否已经在队列或者正在执行,如果是允许重复执行任务或者任务重跑触发的任务不会进行检测- 对于非执行区域任务直接设置为成功并且广播通知下游任务,该参数由application.yml中的
hera.area
配置决定。另外,如果区域设置为all
,则所有区域都能执行。 - 在数据库中
set
一些执行任务所需的必须值 然后再加入任务队列 - 组装
JobElement
,该对象最终会被放到执行队列中。主要参数有:costMinute(任务的预计最大执行分钟数)、jobId(任务的执行实例id)、hostGroupId(任务的执行机器组)、priorityLevel(任务的有限级别)、historyId(该任务对应的执行记录 id)、fixedEmr 是否在固定集群执行、owner任务的创建人所在组
- 将任务根据不同的触发类型,放入不同的任务扫描队列,等待
master
的扫描线程扫描
-
-
高精度HERA数据离散BFKL分析中主要贡献的解耦
2020-03-30 11:21:29我们展示了一组具有正特征值ω的特征函数,以及来自具有负ω特征函数连续性的一小部分,从而很好地描述了该区域x <0> 6 GeV2的高精度HERA F2数据 。 本征函数的相位可以从波美隆谱的简单参数化获得,这在BFKL中具有... -
在HERA产生衍射深层非弹性散射的独家双射流
2020-04-23 13:48:02已使用ZEUS探测器在HERA使用372 pb -1的综合光度测量了在衍射深非弹性e±p散射中的独家双射流的生产。 该测量是针对90°<W> 25GeV2进行的。 显示了绕射流轴的能量流。 横截面表示为β和Ï的函数,其中β= x / xIP,x... -
最终的HERA合并数据对从全局拟合中获得的PDF的影响
2020-04-02 05:33:37我们调查了在MMHT2014 PDF上包含HERA运行I + II组合横截面数据的影响。 当仅包含HERA数据时,我们会在整体拟合的背景下显示拟合质量。 我们检查了PDF的中心值和不确定性的变化。 我们发现,对数据的预测是好的,并且... -
赫拉(hera)分布式任务调度系统之版本(四)
2019-01-04 15:21:26赫拉(hera)分布式任务调度系统之架构,基本功能(一) 赫拉(hera)分布式任务调度系统之项目启动(二) 赫拉(hera)分布式任务调度系统之开发中心(三) 版本介绍 在hera系统中支持历史版本的数据重跑。 每一个任务都会生成...赫拉
大数据平台,随着业务发展,每天承载着成千上万的ETL任务调度,这些任务集中在hive,shell脚本调度。怎么样让大量的ETL任务准确的完成调度而不出现问题,甚至在任务调度执行中出现错误的情况下,任务能够完成自我恢复甚至执行错误告警与完整的日志查询。hera任务调度系统就是在这种背景下衍生的一款分布式调度系统。随着hera集群动态扩展,可以承载成千上万的任务调度。它是一款原生的分布式任务调度,可以快速的添加部署wokrer节点,动态扩展集群规模。支持shell,hive,spark脚本调度,可以动态的扩展支持python等服务器端脚本调度。
项目地址:git@gitee.com:dfire/hera.git
版本介绍
在
hera
系统中支持历史版本的数据重跑。每一个任务都会生成版本,版本时间根据
cron
表达式来产生。其中版本在脚本中没有使用内置变量时无用。版本号的生成规则为:
yyyyMMddHHmm000000 + 任务ID号
,其实日期的替换就是根据版本的前12
位来识别的。版本的使用
在hera中我们内置了一些时间变量。其原理根据
velocity
模版进行变量替换。所有的时间变量都在HeraDateTool
类中,有需要自定义需求的可以自行增加。比如我写了一个脚本,内容如下
点击手动执行,执行时选择一个版本。
通过执行日志信息我们可以发现使用的版本为:
201812290300000001
然后执行的时间为2019年1月4日
,脚本输出的时间是以脚本时间版本时间为准的。通过选择不同时间的版本,我们可以在跑数据时使用
hera内置时间变量,来进行拉取指定的时间的数据。
例子同上。
加入群聊
个人微信(失效加我拉你进去)
-
赫拉(hera)分布式任务调度系统之操作文档
2019-10-15 17:52:25最近发现我总是站在我的角度来使用hera,每个功能都很清楚,但是对于使用者,他们是不清楚的,所以提供一篇hera操作文档。有问题可以在下面回复 操作文档 登录、注册 在hera上登录和注册其实分为两个部分,即用户和...
-
【数据分析-随到随学】量化交易策略模型
-
计算机网络基础
-
基于决策树的卫星故障诊断知识挖掘方法
-
springboot使用JWT实现单点登录
-
动态调制型光场相机波前传感器的数值仿真
-
自给窗体 根据PNG图片制作透明窗体,控件不透明
-
设备树查看配置
-
Java无损导出及转换word文档
-
Microsoft_ActiveSync_win7.zip
-
Android模拟器沙盒路径
-
visual c++ vc自绘窗体Windows XP风格的窗口.程序将非客户区中的标题栏、框架栏都贴上XP风格的位图
-
单脉冲雷达幅相不一致校正方法研究
-
单元测试UnitTest+Pytest【Selenium3】
-
基于风险评估方法的综合预警系统设计
-
JavaSE系列(七)、数组
-
Redis中的事务机制
-
LeetCode57题——插入区间
-
python数据分析基础
-
面试蚂蚁(P7)竟被MySQL难倒,奋发图强后二次面试入职蚂蚁金服
-
新型全数字频率传递方法数学模型及算法研究