精华内容
下载资源
问答
  • Hera

    2020-12-08 22:14:05
    <div><p>Hera a Css builder this replace the cssbuilder.js in phobos that is DEAD code ( noway to run) <p>john <p>Enyo-DCO-1.1-Signed-off-by: john mcconnell johnmcconnell.com</p> <p>ok i think i got ...
  • hera-hera-master.zip

    2020-06-05 10:34:12
    数据平台打造的任务调度系统(HERA) hera分布式任务调度系统是根据前阿里开源调度系统(zeus)进行的二次开发,其中zeus大概在2014年开源,开源后却并未进行维护。我公司(二维火)2015年引进了zeus任务调度系统,一直...
  • 当使用git把hera克隆到本地之后,首先在hera/hera-admin/resources目录下找到hera.sql文件,在自己的数据库中新建这些必要的表,并插入初始化的数据(如果你目前使用的是低版本的hera,那么你可以到 update 目录查看...
  • Fuzzing Hera

    2020-12-09 01:58:00
    <div><p>It should be fuzzed with random inputs to be executed (with and without the sentinel). <p>Most of the fuzzing corpus should be built with the successful test cases of ewasm...ewasm/hera</p></div>
  • Android-hera.zip

    2019-09-17 17:00:26
    Android-hera.zip,运行微信applet的框架。(软件开发工具包、iOS),安卓系统是谷歌在2008年设计和制造的。操作系统主要写在爪哇,C和C 的核心组件。它是在linux内核之上构建的,具有安全性优势。
  • Move hera to submodule

    2020-12-26 06:27:50
    <p>Previously <code>hera</code> submodule (<code>bottleneck</code> and <code>wasserstein</code> implementations) was not a submodule but sources were hard-coded. This PR removes old <code>hera</code> ...
  • 0.001)区域中的高精度HERA F2数据。 λ是F2∝(1 / x)λ定义的F2上升速率的量度。 我们显示,在这两个区域中,在各个Q2值处确定的λ与低x区域相比,在极低x区域中系统地较小。 我们讨论了对此影响的一些可能的物理...
  • Atom-Hera.zip,做事情的电报机器人要求,atom是一个用web技术构建的开源文本编辑器。
  • 本文章主要是为了让使用者能够更加了解 hera 的原理,并且能够在之基础上进行改进所进行。hera 是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera 获取当前机器ip 在 ...

    前言

    本文章主要是为了让使用者能够更加了解 hera 的原理,并且能够在之基础上进行改进所进行。hera 是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera

    获取当前机器ip

    hera 中,有一些静态代码块,这里只说一个很重要的部分,WorkContext 类中有这样一部分代码

    
        static {
            host = NetUtils.getLocalAddress();
            HeraLog.info("-----------------------------当前机器的IP为:{}-----------------------------", host);
            //省略部分代码
          }
    

    该代码会获取当前机器的 ip 信息,由于多网卡的原因可能获取的ip不是很准确,此时你可以通过在 hera-admin/src/main/resources/config/hera.properties 文件,修改server.ip=127.0.0.1 配置为当前机器 ip 即可。具体的代码不再深入。

    分布式锁

    hera 是使用 spring boot 开发的,启动项目执行 AdminBootstrap.main 方法,由于 DistributeLock#init 方法使用了 @PostConstruct 注册,首先会进入该方法

      @PostConstruct
        public void init() {
            workClient.workSchedule.scheduleAtFixedRate(this::checkLock, 10, 60, TimeUnit.SECONDS);
        }
    

    在该方法中会向 ScheduledThreadPoolExecutor 线程池提交一个每隔 60 秒执行的任务,具体任务内容在 DistributeLock#checkLock方法

      public void checkLock() {
            try {
                String ON_LINE = "online";
                //1.从hera_lock表查询最新记录
                HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);
                //2.如果当前没有master,直接以当前机器ip插入hera_lock新记录
                if (heraLock == null) {
                    Date date = new Date();
                    heraLock = HeraLock.builder()
                            .id(1)
                            .host(WorkContext.host)
                            .serverUpdate(date)
                            .subgroup(ON_LINE)
                            .gmtCreate(date)
                            .gmtModified(date)
                            .build();
                    Integer lock = heraLockService.insert(heraLock);
                    if (lock == null || lock <= 0) {
                        return;
                    }
                }
    			//3.master判断
                if (isMaster = WorkContext.host.equals(heraLock.getHost().trim())) {
                    heraLock.setServerUpdate(new Date());
                    heraLockService.update(heraLock);
                    HeraLog.info("hold lock and update time");
                    heraSchedule.startup();
                } else {
                    long currentTime = System.currentTimeMillis();
                    long lockTime = heraLock.getServerUpdate().getTime();
                    long interval = currentTime - lockTime;
                    long timeout = 1000 * 60 * 5L;
                    if (interval > timeout && isPreemptionHost()) {
                        Date date = new Date();
                        Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
                        if (lock != null && lock > 0) {
                            ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host);
                            heraSchedule.startup();
                            heraLock.setHost(WorkContext.host);
                            //TODO  接入master切换通知
                        } else {
                            HeraLog.info("master抢占失败,由其它worker抢占成功");
                        }
                    } else {
                        //非主节点,调度器不执行 主要是为了避免手动修改hera_lock表后出现多master问题
                        heraSchedule.shutdown();
                    }
                }
                //4.初始化work服务
                workClient.init();
                //5.连接master
                workClient.connect(heraLock.getHost().trim());
            } catch (Exception e) {
                ErrorLog.error("检测锁异常", e);
            }
        }
    

    在解释这些代码之前大家要知道 hera 系统有个 hera_lock 表,该表最多只会有一条记录,并且该记录保存着当前的 master ip。也就是说大家如果想切换 master,可以直接通过修改该条记录的 ip 来实现。

    在这里插入图片描述

    • DistributeLock#checkLock每次被调用时第一步会从 hera_lock 表查询出最新的 master 信息,至于在这里使用了 online 进行过滤没有实际意义。如果当前没有 master,即 hera_lock 等于 null (一般在第一次部署启动 hera 时会有该情况),为了方便调试此时会自动把当前机器设置为 master,当然如果插入当前 hera_lock 记录失败(被其它 work 插入了),会直接返回等待下次调用。
    • 在第 3 部分,首先会判断当前机器的 ip 信息与 hera_lock 表的 ip 是否匹配,如果匹配则表示当前机器为 master ,然后更新数据库的 server_update 时间,所以一定要保证你的所有机器时钟一致哦,最后调用heraSchedule.startup() 方法来启动 master 服务。
    • 如果发现当前机器的 ip 信息与 hera_lock 表的 ip 不匹配,则表示当前机器是 work,此时会通过 long interval = currentTime - lockTime; 来计算 master上次的心跳时间间隔,如果发现 master 已经超出5分钟未发送新的心跳,则通过 isPreemptionHost 方法判断当前机器是否允许抢占 master,如果允许则通过Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost()); 方法来进行抢占。抢占 sql
    # 乐观锁方式进行抢占,保证同一时间只有一台机器能够抢占成功
    update hera_lock set gmt_modified = #{gmtModified},host = #{host},server_update = #{serverUpdate} where host = #{lastHost}
    
    

    如果发现 work 抢占 master 成功,则调用 heraSchedule.startup();来启动master 服务。

      private boolean isPreemptionHost() {
            List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnv.preemptionMasterGroup);
            if (preemptionHostList.contains(WorkContext.host)) {
                return true;
            } else {
                HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());
                return false;
            }
        }
    

    isPreemptionHost 方法主要是判断当前机器是否在允许抢占的机器组,该配置为:hera.preemptionMasterGroup

    • 在第 4 部分会进行 work 服务的初始化,此时需要注意的是:master 会启动 master 服务和 work 服务,work 只启动 work 服务。也就是说,你可以在本地 idea 启动 hera 来进行调试,你也可以在生产环境只有一台机器进行任务调度。
    • 在第 5 部分会进行 work 服务连接 master 服务的操作,即 netty 通信打开

    看到这里,想必你已经了解了 DistributeLock 类是一个定时进行分布式锁检测的类,它决定着当前机器是启动 master 服务还是 work 服务

    知识点总结

    • 可以通过直接修改 hera_lock 表的数据来切换 master
    • 可以只启动一台机器来进行 hera 的调度与开发
    • 可以通过配置hera.preemptionMasterGroup参数来让某些机器允许抢占master

    master服务

    在分布式锁中,如果当前机器抢到了 master,那么此时该机器会调用HeraSchedule#startup 启动 master 服务

        public void startup() {
            if (!running.compareAndSet(false, true)) {
                return;
            }
            HeraLog.info("begin to start master context");
            masterContext.init();
        }
    

    为了保证 master 服务只被启动一次,使用了原子类 AtomicBoolean
    继续往下看

     public void init() {
            //主要处理work的请求信息
            threadPool = new ThreadPoolExecutor(
                    0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());
            //主要管理master的一些延迟任务处理
            masterSchedule = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("master-schedule", false));
            masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);
            masterSchedule.allowCoreThreadTimeOut(true);
            //开启quartz服务
            getQuartzSchedulerService().start();
            dispatcher = new Dispatcher();
            //初始化master端的netty消息handler
            handler = new MasterHandler(this);
            //初始化master server
            masterServer = new MasterServer(handler);
            masterServer.start(HeraGlobalEnv.getConnectPort());
            master.init(this);
    
    		//master的定时任务管理者
            choreService = new ChoreService(5, "chore-service");
            //重跑任务初始化
            rerunJobInit = new RerunJobInit(master);
            choreService.scheduledChore(rerunJobInit);
            //重跑任务启动
            rerunJobLaunch = new RerunJobLaunch(master);
            choreService.scheduledChore(rerunJobLaunch);
            //信号丢失检测
            lostJobCheck = new LostJobCheck(master, new DateTime().getMinuteOfHour());
            choreService.scheduledChore(lostJobCheck);
            //心跳检测
            heartCheck = new WorkHeartCheck(master);
            choreService.scheduledChore(heartCheck);
            //版本生成
            actionInit = new JobActionInit(master);
            choreService.scheduledChore(actionInit);
            //任务是否完成检测
            finishCheck = new JobFinishCheck(master);
            choreService.scheduledChore(finishCheck);
            //队列扫描
            queueScan = new JobQueueScan(master);
            choreService.scheduledChoreOnce(queueScan);
            HeraLog.info("end init master content success ");
        }
    

    这部分代码主要功能是

    • 初始化master 端的消息处理线程池threadPool
    • 初始化 master 端的延迟任务线程池 masterSchedule
    • 开启 quartz 服务
    • 初始化 master 端的 netty 消息 handler
    • 初始化 master 的定时任务管理者,任务有:任务重跑初始化、重跑任务启动、信号丢失检测、心跳检测、版本生成、任务是否完成检测、队列扫描等

    work服务

    		//保证只启动一次
            if (!clientSwitch.compareAndSet(false, true)) {
                return;
            }
            workContext.setWorkClient(this);
            //在这里目前是空的,公司内部初始化了一些公共数据源配置
            workContext.init();
            eventLoopGroup = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS))
                                    .addLast("frameDecoder", new ProtobufVarint32FrameDecoder())
                                    .addLast("decoder", new ProtobufDecoder(RpcSocketMessage.SocketMessage.getDefaultInstance()))
                                    .addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender())
                                    .addLast("encoder", new ProtobufEncoder())
                                    .addLast(new WorkHandler(workContext));
                        }
                    });
            HeraLog.info("init work client success ");
    		workSchedule.schedule(new Runnable() {
    
                private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();
                private int failCount = 0;
    
                @Override
                public void run() {
                    try {
                        if (workContext.getServerChannel() != null) {
                            boolean send = workerHandlerHeartBeat.send(workContext);
                            if (!send) {
                                failCount++;
                                ErrorLog.error("send heart beat failed ,failCount :" + failCount);
                            } else {
                                failCount = 0;
                                HeartLog.debug("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress());
                            }
                        } else {
                            ErrorLog.error("server channel can not find on " + WorkContext.host);
                        }
                    } catch (Exception e) {
                        ErrorLog.error("heart beat error:", e);
                    } finally {
                        workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);
                    }
                }
            }, HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);
    		//省略定时更新日志的代码
    

    work 服务这边同样使用了原子类保证只启动一次,然后就是初始化 worknetty 服务,再往下就是定时发送心跳信息给 master

    work连接master

    DistributeLock#checkLock 方法做的最后一件事情就是 work 连接 master,打开通信,调用的方法为:workClient.connect(heraLock.getHost().trim());

     public synchronized void connect(String host) throws Exception {
    
            if (workContext.getServerChannel() != null) {
                if (workContext.getServerHost().equals(host)) {
                    return;
                } else {
                    workContext.getServerChannel().close();
                    workContext.setServerChannel(null);
                }
            }
            workContext.setServerHost(host);
            CountDownLatch latch = new CountDownLatch(1);
            ChannelFutureListener futureListener = (future) -> {
                try {
                    if (future.isSuccess()) {
                        workContext.setServerChannel(FailFastCluster.wrap(future.channel()));
                        SocketLog.info(workContext.getServerChannel().toString());
                    }
                } catch (Exception e) {
                    ErrorLog.error("连接master异常", e);
                } finally {
                    latch.countDown();
                }
            };
            ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, HeraGlobalEnv.getConnectPort()));
            connectFuture.addListener(futureListener);
            if (!latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS)) {
                connectFuture.removeListener(futureListener);
                connectFuture.cancel(true);
                throw new ExecutionException(new TimeoutException("connect server consumption of 2 seconds"));
            }
            if (!connectFuture.isSuccess()) {
                throw new RuntimeException("connect server failed " + host,
                        connectFuture.cause());
            }
            SocketLog.info("connect server success");
        }
    

    连接 master 时会首先判断当前是否已经连接了 master,如果已经连接并且和当前 master ip 一致则直接返回,否则关闭 netty 通信,重置当前 workmaster 信息。

    然后通过 CountDownLatchawait(long timeout, TimeUnit unit)方法,来进行workmaster 的超时连接判断,通过ChannelFutureListenermasterwork 通信连接成功时设置 ServerChannel,并且容错方式为快速失败FailFastCluster.wrap(future.channel())

    展开全文
  • 在这里,我们通过面对HERA数据来研究该方程作为现象学工具的相关性。 为此,我们首先通过包括两类单对数校正来提高恢复的扰动精度:由DGLAP分裂函数中的第一个非奇异项生成的校正和表示QCD耦合的单循环运行的校正 ...
  • hera系统只是负责调度以及辅助的系统,具体的计算还是要落在hadoop、hive、yarn、spark等集群中去。所以此时又一个硬性要求,如果要执行hadoop,hive,spark等任务,我们的hera系统的worker一定要部署在这些集群某些...
  • <p>I am trying to install Qtile 0.14.2 on Elementary Hera 5.1, but pip3 gives me this error, is there a way i can force pip3 to install it? <p>Thanks for the help. <p>this is the log <p>gibranlp:~/...
  • [WIP] Support WAVM in Hera

    2020-12-09 01:43:41
    <div><p>merges into vm-abstr <p>Part of #158.</p><p>该提问来源于开源项目:ewasm/hera</p></div>
  • 在等式接触相互作用(CI)的框架中使用了对应于1 fb-1左右的光度的高精度HERA数据,以设置对超出标准模型对电子夸克散射的可能高能贡献的限制。 考虑了中性和带电电流ep散射中包含的深部非弹性横截面的测量。 ep数据...
  • 在偶极分解的框架内,我们使用了最近经共线性改进的Balitsky-Kovchegov方程来拟合HERA数据,以计算小Bjorken x处的包容性深层非弹性散射。 该方程式包括双横向对数和单横向对数的全阶求和以及运行耦合校正。 与以前...
  • --vm hera/build/src/libhera.dylib --filltests --singletest wrc20Challenge Running tests using path: "tests/" Running 1 test case... Test Case "stEWASMTests": 100% cpp-ethereum/test/...
  • 近年来,在e-p对撞机HERA上进行的几项实验已收集了携带大量质子能量的主要核子光谱上的高精度深非弹性散射(DIS)数据。 在本文中,我们分析了在扰动QCD框架下在HERA DIS中产生正向质子和中子的最新实验数据。 我们...
  • 我们提出了一种更新的与冲击参数有关的饱和度模型,该模型通过对组合的HERA I和I + II减小的横截面数据进行拟合确定。 使用相同的HERA数据来拟合所施加偶极子振幅的线性化版本,这使得可以在各种实验中估计饱和效应...
  • <p>Description of problem: I got to Tower of Hera and picked up all dungeon items/keys. The counter on screen said 5/6. I went back to double check all item locations but I'd picked everything up....
  • 首次在e±p碰撞中测量了带电电流深层非弹性散射中的魅力产生,这是使用ZEUS检测器在HERA处收集到的,对应于358 pb-1的综合光度。 在200 GeV2 <Q 2 <60000的运动相空间区域内,在质心能量为s $$ \ sqrt {s} $$ ...
  • 赫拉(hera)分布式任务调度系统之架构,基本功能(一) 赫拉(hera)分布式任务调度系统之项目启动(二) 赫拉(hera)分布式任务调度系统之开发中心(三) 赫拉(hera)分布式任务调度系统之版本(四) 赫拉(hera)分布式任务...

    相关介绍

     

    1.实现集群HA,机器宕机环境实现机器断线重连与心跳恢复与hera集群HA,节点单点故障环境下任务自动恢复,master断开,work抢占master

    @Component
    public class DistributeLock {
    
    
        @Autowired
        private HeraHostRelationService hostGroupService;
        @Autowired
        private HeraLockService heraLockService;
    
        @Autowired
        private WorkClient workClient;
    
        @Autowired
        private HeraSchedule heraSchedule;
    
        private final long timeout = 1000 * 60 * 5L;
    
        private final String ON_LINE = "online";
    
        @PostConstruct
        public void init() {
    
            workClient.workSchedule.scheduleAtFixedRate(() -> {
                try {
                    checkLock();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, 10, 60, TimeUnit.SECONDS);
        }
    
        public void checkLock() {
            HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);
            if (heraLock == null) {
                Date date = new Date();
                heraLock = HeraLock.builder()
                        .id(1)
                        .host(WorkContext.host)
                        .serverUpdate(date)
                        .subgroup(ON_LINE)
                        .gmtCreate(date)
                        .gmtModified(date)
                        .build();
                Integer lock = heraLockService.insert(heraLock);
                if (lock == null || lock <= 0) {
                    return;
                }
            }
    
            if (WorkContext.host.equals(heraLock.getHost().trim())) {
                heraLock.setServerUpdate(new Date());
                heraLockService.update(heraLock);
                HeraLog.info("hold lock and update time");
                heraSchedule.startup();
            } else {
                long currentTime = System.currentTimeMillis();
                long lockTime = heraLock.getServerUpdate().getTime();
                long interval = currentTime - lockTime;
                if (interval > timeout && isPreemptionHost()) {
                    Date date = new Date();
                    Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
                    if (lock != null && lock > 0) {
                        ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host);
                        heraSchedule.startup();
                        heraLock.setHost(WorkContext.host);
                        //TODO  接入master切换通知
                    } else {
                        HeraLog.info("master抢占失败,由其它worker抢占成功");
                    }
                } else {
                    //非主节点,调度器不执行
                    heraSchedule.shutdown();
                }
            }
            workClient.init();
            try {
                workClient.connect(heraLock.getHost().trim());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 检测该ip是否具有抢占master的权限
         * @return 是/否
         */
        private boolean isPreemptionHost() {
            List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnvironment.preemptionMasterGroup);
            if (preemptionHostList.contains(WorkContext.host)) {
                return true;
            } else {
                HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());
                return false;
            }
        }
    }

    1.通过PostConstruct在项目启动的时候触发定时任务。。

    2.定时任务主要通过数库来进行强占master

    3.如果是master,定时任务做的事情就是update  heraLock  ServerUpdate

    4.如果是worker,定时任务做的事情就是查看master是不是长久没更新ServerUpdate,然后强占

    5. heraSchedule.startup();workClient.init();workClient.connect(heraLock.getHost().trim()) 这三方法里面有状态,会置空操作。

    2.masterServer与workClient启动及交互

    masterServer的启动入口为,heraSchedule.startup()-->masterxContext.init()

     public void init() {
            threadPool = new ThreadPoolExecutor(
                    0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());
            masterSchedule = new ScheduledThreadPoolExecutor(5, new NamedThreadFactory("master-schedule", false));
            masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);
            masterSchedule.allowCoreThreadTimeOut(true);
            this.getQuartzSchedulerService().start();
            dispatcher = new Dispatcher();
            handler = new MasterHandler(this);//主handler
            masterServer = new MasterServer(handler);//主通讯类
            masterServer.start(HeraGlobalEnvironment.getConnectPort());//启动
            master.init(this);
            HeraLog.info("end init master content success ");
        }

    而MasterSever主要关注MasterHandler

      public MasterHandler(MasterContext masterContext) {
            this.masterContext = masterContext;
            //把处理结果统一管理
            completionService = new ExecutorCompletionService<>(
                    new ThreadPoolExecutor(
                            0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-execute", false), new ThreadPoolExecutor.AbortPolicy()));
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("master-deal", false), new ThreadPoolExecutor.AbortPolicy());
            executor.execute(() -> {
                        Future<ChannelResponse> future;
                        ChannelResponse response;
                        while (true) {//一量处理有结果,即时响应
                            try {
                                future = completionService.take();
                                response = future.get();
                                TaskLog.info("3-1.MasterHandler:-->master prepare send status : {}", response.webResponse.getStatus());
                                response.channel.writeAndFlush(wrapper(response.webResponse));
                                TaskLog.info("3-2.MasterHandler:-->master send response success, requestId={}", response.webResponse.getRid());
                            } catch (Exception e) {
                                ErrorLog.error("master handler future take error:{}", e);
                                e.printStackTrace();
                            } catch (Throwable throwable) {
                                ErrorLog.error("master handler future take throwable{}", throwable);
                                throwable.printStackTrace();
                            }
                        }
                    }
            );
        }
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            SocketMessage socketMessage = (SocketMessage) msg;
            Channel channel = ctx.channel();
            switch (socketMessage.getKind()) {
                //心跳
                case REQUEST://如果是内置任务无需处理结果
                    Request request = Request.newBuilder().mergeFrom(socketMessage.getBody()).build();
                    switch (request.getOperate()) {
                        case HeartBeat:
                            masterContext.getThreadPool().execute(() -> MasterHandleRequest.handleHeartBeat(masterContext, channel, request));
                            break;
                        case SetWorkInfo:
                           //此处省略代码。。
                            break;
                    }
                    break;
                case WEB_REQUEST://web任务统处理,结果由专门线程返回
                    final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();
                    switch (webRequest.getOperate()) {
                        case ExecuteJob:
                          //此处省略代码。。。
                    break;
                case RESPONSE://对于客户端内置请求响应结果交给监听器
                    masterContext.getThreadPool().execute(() -> {
                        Response response = null;
                        try {
                            response = Response.newBuilder().mergeFrom(socketMessage.getBody()).build();
    
                            SocketLog.info("6.MasterHandler:receiver socket info from work {}, response is {}", ctx.channel().remoteAddress(), response.getRid());
                            for (ResponseListener listener : listeners) {
                                listener.onResponse(response);
                            }
                        } catch (InvalidProtocolBufferException e) {
                            e.printStackTrace();
                        }
                    });
    
                    break;
                case WEB_RESPONSE://对于客户端WEb请求响应结果交给监听器
                    masterContext.getThreadPool().execute(() -> {
                        WebResponse webResponse = null;
                        try {
                            webResponse = WebResponse.newBuilder().mergeFrom(socketMessage.getBody()).build();
                        } catch (InvalidProtocolBufferException e) {
                            e.printStackTrace();
                        }
                        SocketLog.info("6.MasterHandler:receiver socket info from work {}, webResponse is {}", ctx.channel().remoteAddress(), webResponse.getRid());
                        for (ResponseListener listener : listeners) {
                            listener.onWebResponse(webResponse);
                        }
                    });
                    break;
                default:
                    ErrorLog.error("unknown request type : {}", socketMessage.getKind());
                    break;
            }
        }
    
    //再看看监听器,任务里面有个ID号可以对应起结果
    public class WorkResponseListener extends ResponseListenerAdapter {
    
    
        private RpcWebRequest.WebRequest request;
        private volatile Boolean receiveResult;
        private CountDownLatch latch;
        private RpcWebResponse.WebResponse webResponse;
    
        @Override
        public void onWebResponse(RpcWebResponse.WebResponse response) {
            if (request.getRid() == response.getRid()) {
                try {
                    webResponse = response;
                    receiveResult = true;
                } catch (Exception e) {
                    ErrorLog.error("work release exception {}", e);
                } finally {
                    latch.countDown();
                }
            }
        }
    }
    

    workclient的设计方式与masterServer设计方式如出一辙,具体查看WorkHandler类

    1. 把有需要结果的语法统一交给completionService,统一返回
    2. 针对响应结果采用监听器的方式,通知相关业务

    3.支持master/work 负载,内存,进程,cpu信息的可视化查看

    //workClient中init方法,定时上报workerHandlerHeartBeat 
    workSchedule.schedule(new Runnable() {
    
                private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();
                private int failCount = 0;
    
                @Override
                public void run() {
                    try {
                        if (workContext.getServerChannel() != null) {
                            //这是主方法
                            boolean send = workerHandlerHeartBeat.send(workContext);
                            if (!send) {
                                failCount++;
                                ErrorLog.error("send heart beat failed ,failCount :" + failCount);
                            } else {
                                failCount = 0;
                                HeartLog.info("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress());
                            }
                        } else {
                            ErrorLog.error("server channel can not find on " + WorkContext.host);
                        }
                    } catch (Exception e) {
                        ErrorLog.error("heart beat error:", e);
                    } finally {
                        workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS);
                    }
                }
            }, HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS);
    
    
    //主要send方法
    public boolean send(WorkContext context) {
            try {
                MemUseRateJob memUseRateJob = new MemUseRateJob(1);
                //内存信息获取
                memUseRateJob.readMemUsed();
                CpuLoadPerCoreJob loadPerCoreJob = new CpuLoadPerCoreJob();
                //负载信息获取
                loadPerCoreJob.run();
                //构建包体
                RpcHeartBeatMessage.HeartBeatMessage hbm = RpcHeartBeatMessage.HeartBeatMessage.newBuilder()
                        .setHost(WorkContext.host)
                        .setMemTotal(memUseRateJob.getMemTotal())
                        .setMemRate(memUseRateJob.getRate())
                        .setCpuLoadPerCore(loadPerCoreJob.getLoadPerCore())
                        .setTimestamp(System.currentTimeMillis())
                        .addAllDebugRunnings(context.getDebugRunning().keySet())
                        .addAllManualRunnings(context.getManualRunning().keySet())
                        .addAllRunnings(context.getRunning().keySet())
                        .setCores(WorkContext.cpuCoreNum)
                        .build();     context.getServerChannel().writeAndFlush(RpcSocketMessage.SocketMessage.newBuilder().
                        setKind(RpcSocketMessage.SocketMessage.Kind.REQUEST).
                        setBody(RpcRequest.Request.newBuilder().
                                setRid(AtomicIncrease.getAndIncrement()).
                                setOperate(RpcOperate.Operate.HeartBeat).
                                setBody(hbm.toByteString()).
                                build().toByteString()).
                        build());
            } catch (RemotingException e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    
    //master对于heartBeat的处理方式
     public static void handleHeartBeat(MasterContext masterContext, Channel channel, Request request) {
            //放到master服务器的内存中,等待WEB应用的调用 
            MasterWorkHolder workHolder = masterContext.getWorkMap().get(channel);
            HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
            HeartBeatMessage heartBeatMessage;
            try {
                heartBeatMessage = HeartBeatMessage.newBuilder().mergeFrom(request.getBody()).build();
                heartBeatInfo.setHost(heartBeatMessage.getHost());
                heartBeatInfo.setMemRate(heartBeatMessage.getMemRate());
                heartBeatInfo.setMemTotal(heartBeatMessage.getMemTotal());
                heartBeatInfo.setCpuLoadPerCore(heartBeatMessage.getCpuLoadPerCore());
                heartBeatInfo.setRunning(heartBeatMessage.getRunningsList());
                heartBeatInfo.setDebugRunning(heartBeatMessage.getDebugRunningsList());
                heartBeatInfo.setManualRunning(heartBeatMessage.getManualRunningsList());
                heartBeatInfo.setTimestamp(heartBeatMessage.getTimestamp());
                heartBeatInfo.setCores(heartBeatMessage.getCores());
                workHolder.setHeartBeatInfo(heartBeatInfo);
                HeartLog.info("received heart beat from {} : {}", heartBeatMessage.getHost(), JSONObject.toJSONString(heartBeatInfo));
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }

    1.work启动时,启动定时任务,专们负责收集数据,并发送给master

    2.master,收到信息存在自己内存中,等待web调用并返回

    4.任务添加与执行过程

    //研发中心立即执行
    @RequestMapping(value = "/debugSelectCode", method = RequestMethod.POST)
        @ResponseBody
        public WebAsyncTask<JsonResponse> debugSelectCode(@RequestBody HeraFile heraFile) {
    
            String owner = getOwner();
            return new WebAsyncTask<JsonResponse>(HeraGlobalEnvironment.getRequestTimeout(), () -> {
                Map<String, Object> res = new HashMap<>(2);
                HeraFile file = heraFileService.findById(heraFile.getId());
                file.setContent(heraFile.getContent());
                String name = file.getName();
                String runType;
                HeraDebugHistory history = HeraDebugHistory.builder()
                        .fileId(file.getId())
                        .script(heraFile.getContent())
                        .startTime(new Date())
                        .owner(owner)
                        .hostGroupId(file.getHostGroupId() == 0 ? HeraGlobalEnvironment.defaultWorkerGroup : file.getHostGroupId())
                        .build();
    
                int suffixIndex = name.lastIndexOf(Constants.POINT);
                if (suffixIndex == -1) {
                    return new JsonResponse(false, "无后缀名,请设置支持的后缀名[.sh .hive .spark]");
                }
                String suffix = name.substring(suffixIndex);
                //只支持shell,hive,spark  
                if ((Constants.HIVE_SUFFIX).equalsIgnoreCase(suffix)) {
                    runType = Constants.HIVE_FILE;
                } else if ((Constants.SHELL_SUFFIX).equalsIgnoreCase(suffix)) {
                    runType = Constants.SHELL_FILE;
                } else if ((Constants.SPARK_SUFFIX).equalsIgnoreCase(suffix)) {
                    runType = Constants.SPARK_FILE;
                } else {
                    return new JsonResponse(false, "暂未支持的后缀名[" + suffix + "],请设置支持的后缀名[.sh .hive .spark]");
                }
                history.setRunType(runType);
                String newId = debugHistoryService.insert(history);
                //向master提交任务 
                workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.DebugKind, newId);
                res.put("fileId", file.getId());
                res.put("debugId", newId);
                return new JsonResponse(true, "执行成功", res);
            });
        }
    
    
    //master端对此任务的相关处理,则是加入DebugQueue,进入等待队列
     public void debug(HeraDebugHistoryVo debugHistory) {
            JobElement element = JobElement.builder()
                    .jobId(debugHistory.getId())
                    .hostGroupId(debugHistory.getHostGroupId())
                    .build();
            debugHistory.setStatus(StatusEnum.RUNNING);
            debugHistory.setStartTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            debugHistory.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 进入任务队列");
            masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(debugHistory));
            try {
                masterContext.getDebugQueue().put(element);
            } catch (InterruptedException e) {
                ErrorLog.error("添加开发中心执行任务失败:" + element.getJobId(), e);
            }
        }
    
    
        /**
         * 在master.init的时候会,会启动定时任务
         * 扫描任务等待队列,取出任务去执行
         * 对于没有可运行机器的时,manual,debug任务重新offer到原队列
         */
        public boolean scan() throws InterruptedException {
            boolean hasTask = false;
            if (!masterContext.getScheduleQueue().isEmpty()) {
                代码省略
            }
    
            if (!masterContext.getManualQueue().isEmpty()) {
                //代码省略
            }
    
            if (!masterContext.getDebugQueue().isEmpty()) {
                //拿出任务
                JobElement jobElement = masterContext.getDebugQueue().take();
                MasterWorkHolder selectWork = getRunnableWork(jobElement);
                if (selectWork == null) {
                    masterContext.getDebugQueue().put(jobElement);
                    ScheduleLog.warn("can not get work to execute DebugQueue job in master,job is:{}", jobElement.toString());
                } else {
                    //当有机器的时候,直接给机子分配任务(向work发送任务)
                    runDebugJob(selectWork, jobElement.getJobId());
                    hasTask = true;
                }
    
            }
            return hasTask;
    
        }
    
    
     /**
         * worker接收到命令,JobUtils.createDebugJob创建job文件到服务器,拼接shell,并调用命令执行
         */
        private Future<RpcResponse.Response> debug(WorkContext workContext, RpcRequest.Request request) {
            RpcDebugMessage.DebugMessage debugMessage = null;
            try {
                debugMessage = RpcDebugMessage.DebugMessage.newBuilder().mergeFrom(request.getBody()).build();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            String debugId = debugMessage.getDebugId();
            HeraDebugHistoryVo history = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
            return workContext.getWorkExecuteThreadPool().submit(() -> {
                int exitCode = -1;
                Exception exception = null;
                ResponseStatus.Status status;
                try {
    
                    history.setExecuteHost(WorkContext.host);
                    workContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history));
    
                    String date = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
                    File directory = new File(HeraGlobalEnvironment.getWorkDir() + File.separator + date + File.separator + "debug-" + debugId);
                    if (!directory.exists()) {
                        if (directory.mkdirs()) {
                            HeraLog.error("创建文件失败:" + directory.getAbsolutePath());
                        }
                    }
                    Job job = JobUtils.createDebugJob(new JobContext(JobContext.DEBUG_RUN), BeanConvertUtils.convert(history),
                            directory.getAbsolutePath(), workContext);
                    workContext.getDebugRunning().putIfAbsent(debugId, job);
                    exitCode = job.run();
                } catch (Exception e) {
                    exception = e;
                    history.getLog().appendHeraException(e);
                } finally {
                    HeraDebugHistoryVo heraDebugHistoryVo = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
                    heraDebugHistoryVo.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    
                    StatusEnum statusEnum = getStatusFromCode(exitCode);
                    if (exitCode == 0) {
                        status = ResponseStatus.Status.OK;
                        heraDebugHistoryVo.setStatus(statusEnum);
                    } else {
                        status = ResponseStatus.Status.ERROR;
                        heraDebugHistoryVo.setStatus(statusEnum);
                    }
                    workContext.getHeraDebugHistoryService().updateStatus(BeanConvertUtils.convert(heraDebugHistoryVo));
                    HeraDebugHistoryVo debugHistory = workContext.getDebugRunning().get(debugId).getJobContext().getDebugHistory();
                    workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(debugHistory));
                    workContext.getDebugRunning().remove(debugId);
                }
                String errorText = "";
                if (exception != null && exception.getMessage() != null) {
                    errorText = exception.getMessage();
                }
                return RpcResponse.Response.newBuilder()
                        .setRid(request.getRid())
                        .setOperate(RpcOperate.Operate.Debug)
                        .setStatusEnum(status)
                        .setErrorText(errorText)
                        .build();
            });
        }
    1. web把任务请求统一提交到master,master加到队列中
    2. master的后台进程去队列中拿任务
    3. 用scan选择work
    4. work接收到任务,执行对应脚本job.

    5.服务降级,负载均衡

     /**
         * 获取hostGroupId中可以分发任务的worker
         */
        private MasterWorkHolder getRunnableWork(JobElement jobElement) {
            //根据算法选择对应的work
            MasterWorkHolder selectWork = loadBalance.select(jobElement, masterContext);
            if (selectWork == null) {
                return null;
            }
            Channel channel = selectWork.getChannel().getChannel();
            HeartBeatInfo beatInfo = selectWork.getHeartBeatInfo();
            // 如果最近两次选择的work一致  需要等待机器最新状态发来之后(睡眠)再进行任务分发
            if (HeraGlobalEnvironment.getWarmUpCheck() > 0 && lastWork != null && channel == lastWork && (beatInfo.getCpuLoadPerCore() > 0.6F || beatInfo.getMemRate() > 0.7F)) {
                ScheduleLog.info("达到预热条件,睡眠" + HeraGlobalEnvironment.getWarmUpCheck() + "秒");
                try {
                    TimeUnit.SECONDS.sleep(HeraGlobalEnvironment.getWarmUpCheck());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lastWork = null;
                return null;
            }
            lastWork = channel;
            return selectWork;
        }
    1. 先去数据库(缓存)查询任务对应分组机子
    2. 与在线机子做差集
    3. 在master为任务选择work的时候,项目分别采用了轮询及随机算法来实现

    6.支持日志的实时滚动

     /**
             * 定时 刷新日志到数据库
             */
            workSchedule.scheduleWithFixedDelay(new Runnable() {
              
                @Override
                public void run() {
                    try {
                        for (Job job : workContext.getRunning().values()) {
                            //处理日志
                        }
    
                        for (Job job : workContext.getManualRunning().values()) {
                         //处理日志
                        }
    
                        for (Job job : workContext.getDebugRunning().values()) {
                            try {
                                 //服务在运行中会向这类不断导入日志
                                HeraDebugHistoryVo history = job.getJobContext().getDebugHistory();
                                workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(history));
                            } catch (Exception e) {
                                printDebugLog(job, e);
                            }
                        }
                    } catch (Exception e) {
                        ErrorLog.error("job log flush exception:{}", e.toString());
                    }
    
                }
            }, 0, 5, TimeUnit.SECONDS);
        }

    1.脚本在运行中的时候,work都会为每个脚本维护一个HeraJobHistoryVo,其中  LogContent log属性会和服务输入输出流对应

    2.work启动的时候,会启动一个定时刷新日志在数据库的进程

    3.web端通个查询数据库很好的达到时实

    7.hera事件模型dispatch

      /**
         * 事件广播,每次任务状态变化,触发响应事件,全局广播,自动调度successEvent,触发依赖调度一些依赖更新
         *
         * @param applicationEvent
         */
        public void dispatch(ApplicationEvent applicationEvent) {
            try {
                //封装事件
                MvcEvent mvcEvent = new MvcEvent(this, applicationEvent);
                mvcEvent.setApplicationEvent(applicationEvent);
                //前置监听者
                if (fireEvent(beforeDispatch, mvcEvent)) {
                    List<AbstractHandler> jobHandlersCopy = Lists.newArrayList(jobHandlers);
                    for (AbstractHandler jobHandler : jobHandlersCopy) {
                        //是否可执行
                        if (jobHandler.canHandle(applicationEvent)) {
                            if (!jobHandler.isInitialized()) {
                                jobHandler.setInitialized(true);
                            }
                            //job触发
                            jobHandler.handleEvent(applicationEvent);
                        }
                    }
                    //后置监听者
                    fireEvent(afterDispatch, mvcEvent);
                }
            } catch (Exception e) {
                ErrorLog.error("global dispatch job event error", e);
            }
    
        }

    1.dispatch对于事件的主要处理顺序  前置监听 ---- jobHandler  ----- 后置监听

    8.支持关注自己的任务,自动调度执行失败时会向负责人发送邮件

     private void runDebugJob(MasterWorkHolder selectWork, String debugId) {
            final MasterWorkHolder workHolder = selectWork;
            this.executeJobPool.execute(() -> {
                HeraDebugHistoryVo history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
                history.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 开始运行");
                masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history));
                Exception exception = null;
                RpcResponse.Response response = null;
                Future<RpcResponse.Response> future = null;
                try {
                    future = new MasterExecuteJob().executeJob(masterContext, workHolder, JobExecuteKind.ExecuteKind.DebugKind, debugId);
                    response = future.get(HeraGlobalEnvironment.getTaskTimeout(), TimeUnit.HOURS);
                } catch (Exception e) {
                    exception = e;
                    if (future != null) {
                        future.cancel(true);
                    }
                    DebugLog.error(String.format("debugId:%s run failed", debugId), e);
                }
                boolean success = response != null && response.getStatusEnum() == ResponseStatus.Status.OK;
                if (!success) {
                    exception = new HeraException(String.format("fileId:%s run failed ", history.getFileId()), exception);
                    TaskLog.info("8.Master: debug job error");
                    history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
                    //创建失败事件
                    HeraDebugFailEvent failEvent = HeraDebugFailEvent.builder()
                            .debugHistory(BeanConvertUtils.convert(history))
                            .throwable(exception)
                            .fileId(history.getFileId())
                            .build();
                    //将失败事件通知关心的人
                    masterContext.getDispatcher().forwardEvent(failEvent);
                } else {
                    TaskLog.info("7.Master: debug success");
                     //创建成功事件
                    HeraDebugSuccessEvent successEvent = HeraDebugSuccessEvent.builder()
                            .fileId(history.getFileId())
                            .history(BeanConvertUtils.convert(history))
                            .build();
                    //将成功事件通知关心的人
                    masterContext.getDispatcher().forwardEvent(successEvent);
                }
            });
        }
    
    
    public void alarm(HeraJobFailedEvent failedEvent) {
            String actionId = failedEvent.getActionId();
            Integer jobId = ActionUtil.getJobId(actionId);
            if (jobId == null) {
                return;
            }
            HeraJob heraJob = heraJobService.findById(jobId);
            // 自己建立的任务运行失败必须收到告警
            if (heraJob.getAuto() != 1 && !Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) {
                return;
            }
            StringBuilder address = new StringBuilder();
            HeraUser user = heraUserService.findByName(heraJob.getOwner());
            address.append(user.getEmail().trim()).append(Constants.SEMICOLON);
            try {
                HeraJobMonitor monitor = heraJobMonitorService.findByJobIdWithOutBlank(heraJob.getId());
                if (monitor == null && Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) {
                    ScheduleLog.info("任务无监控人,发送给owner:{}", heraJob.getId());
    
                } else if (monitor != null) {
                    String ids = monitor.getUserIds();
                    String[] id = ids.split(Constants.COMMA);
                    for (String anId : id) {
                        if (StringUtils.isBlank(anId)) {
                            continue;
                        }
                        HeraUser monitor_user = heraUserService.findById(Integer.parseInt(anId));
                        if (monitor_user != null && monitor_user.getEmail() != null) {
                            address.append(monitor_user.getEmail().trim()).append(Constants.SEMICOLON);
                        }
                    }
                }
    
                String title = "hera调度任务失败[任务=" + heraJob.getName() + "(" + heraJob.getId() + "),版本号=" + actionId + "]";
                String content = "任务ID:" + heraJob.getId() + Constants.HTML_NEW_LINE
                        + "任务名:" + heraJob.getName() + Constants.HTML_NEW_LINE
                        + "任务版本号:" + actionId + Constants.HTML_NEW_LINE
                        + "任务描述:" + heraJob.getDescription() + Constants.HTML_NEW_LINE
                        + "任务OWNER:" + heraJob.getOwner() + Constants.HTML_NEW_LINE;
    
                String errorMsg = failedEvent.getHeraJobHistory().getLog().getMailContent();
                if (errorMsg != null) {
                    content += Constants.HTML_NEW_LINE + Constants.HTML_NEW_LINE + "--------------------------------------------" + Constants.HTML_NEW_LINE + errorMsg;
                }
                emailService.sendEmail(title, content, address.toString());
            } catch (MessagingException e) {
                e.printStackTrace();
                ErrorLog.error("发送邮件失败");
            }

    1.在master.init中  有 masterContext.getDispatcher().addDispatcherListener(new HeraJobFailListener());这一代码。

         他关注job失败的事件,当master端收的失败响应,则会触发事件

    2.任务失败事件的最终响应者是 EmailJobFailAlarm,发邮件通知责任人

     

    9.生成版本

    //isSingle是否全量生成版本
    //jobId 如果是单个,其任务ID
    private boolean generateAction(boolean isSingle, Integer jobId) {
            try {
                //防止全量重复调用, 
                if (isGenerateActioning) {
                    return true;
                }
                DateTime dateTime = new DateTime();
                Date now = dateTime.toDate();
                int executeHour = dateTime.getHourOfDay();
                //凌晨生成版本,早上七点以后开始再次生成版本
                boolean execute = executeHour == 0 || (executeHour > ActionUtil.ACTION_CREATE_MIN_HOUR && executeHour <= ActionUtil.ACTION_CREATE_MAX_HOUR);
                if (execute || isSingle) {
                    String currString = ActionUtil.getCurrHourVersion();
                    if (executeHour == ActionUtil.ACTION_CREATE_MAX_HOUR) {
                        Tuple<String, Date> nextDayString = ActionUtil.getNextDayString();
                        //例如:今天 2018.07.17 23:50  currString = 201807180000000000 now = 2018.07.18 23:50
                        currString = nextDayString.getSource();
                        now = nextDayString.getTarget();
                    }
                    Long nowAction = Long.parseLong(currString);
                    ConcurrentHashMap<Long, HeraAction> actionMap = new ConcurrentHashMap<>(heraActionMap.size());
                    List<HeraJob> jobList = new ArrayList<>();
                    //批量生成
                    if (!isSingle) {
                        isGenerateActioning = true;
                        jobList = masterContext.getHeraJobService().getAll();
                    } else { //单个任务生成版本
                        HeraJob heraJob = masterContext.getHeraJobService().findById(jobId);
                        jobList.add(heraJob);
                        actionMap = heraActionMap;
                        List<Long> shouldRemove = new ArrayList<>();
                        for (Long actionId : actionMap.keySet()) {
                            if (StringUtil.actionIdToJobId(String.valueOf(actionId), String.valueOf(jobId))) {
                                shouldRemove.add(actionId);
                            }
                        }
                        //移除内存所有依赖这个Job的老版本
                        shouldRemove.forEach(actionMap::remove);
                        List<AbstractHandler> handlers = new ArrayList<>(masterContext.getDispatcher().getJobHandlers());
                        移除内存所有依赖这个Job的老版本任务
                        if (handlers != null && handlers.size() > 0) {
                            for (AbstractHandler handler : handlers) {
                                JobHandler jobHandler = (JobHandler) handler;
                                if (StringUtil.actionIdToJobId(jobHandler.getActionId(), String.valueOf(jobId))) {
                                    masterContext.getQuartzSchedulerService().deleteJob(jobHandler.getActionId());
                                    masterContext.getDispatcher().removeJobHandler(jobHandler);
                                }
                            }
                        }
                    }
                    String cronDate = ActionUtil.getActionVersionPrefix(now);
                    Map<Integer, List<HeraAction>> idMap = new HashMap<>(jobList.size());
                    Map<Integer, HeraJob> jobMap = new HashMap<>(jobList.size());
                    //生成新的版本,并且注入actionMap
                    generateScheduleJobAction(jobList, cronDate, actionMap, nowAction, idMap, jobMap);
                    //整理新的依赖关系  
                    for (Map.Entry<Integer, HeraJob> entry : jobMap.entrySet()) {
                        generateDependJobAction(jobMap, entry.getValue(), actionMap, nowAction, idMap);
                    }
                    if (executeHour < ActionUtil.ACTION_CREATE_MAX_HOUR) {
                        heraActionMap = actionMap;
                    }
                    Dispatcher dispatcher = masterContext.getDispatcher();
                    if (dispatcher != null) {
                        if (actionMap.size() > 0) {
                            for (Long id : actionMap.keySet()) {
                                dispatcher.addJobHandler(new JobHandler(id.toString(), masterContext.getMaster(), masterContext));
                                //新加入的版本发送事件通知
                                if (id >= Long.parseLong(currString)) {
                                    dispatcher.forwardEvent(new HeraJobMaintenanceEvent(Events.UpdateActions, id.toString()));
                                }
                            }
                        }
                    }
                    ScheduleLog.info("[单个任务:{},任务id:{}]generate action success", isSingle, jobId);
                    return true;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                isGenerateActioning = false;
            }
            return false;
        }
    
    
    1. 把要生成版本号的JOB,从内存及调度中删除
    2. 生成新版本号JOB,整理依赖关系,加入内存到缓存中
    3. 把新版本的生成事件播放出去

    9.支持任务的定时调度、依赖调度、手动调度、手动恢复

    定时调度

           主要是根据cron表达式来解析该任务的执行时间,在达到触发时间时将该任务加入任务队列

    /**
         * 自动任务的版本生成
         *
         * @param jobList   任务集合
         * @param cronDate  日期
         * @param actionMap actionMap集合
         * @param nowAction 生成版本时间的action
         * @param idMap     已经遍历过的idMap
         * @param jobMap    依赖任务map映射
         */
        public void generateScheduleJobAction(List<HeraJob> jobList, String cronDate, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap, Map<Integer, HeraJob> jobMap) {
            List<HeraAction> insertActionList = new ArrayList<>();
            for (HeraJob heraJob : jobList) {
                if (heraJob.getScheduleType() != null) {
                    if (heraJob.getScheduleType() == 1) {
                        jobMap.put(heraJob.getId(), heraJob);
                    } else if (heraJob.getScheduleType() == 0) {
                        //如果是定时任务,会解析表达式,拆分出多个版本
                        String cron = heraJob.getCronExpression();
                        List<String> list = new ArrayList<>();
                        if (StringUtils.isNotBlank(cron)) {
                            boolean isCronExp = CronParse.Parser(cron, cronDate, list);
                            if (!isCronExp) {
                                ErrorLog.error("cron parse error,jobId={},cron = {}", heraJob.getId(), cron);
                                continue;
                            }
                            List<HeraAction> heraAction = createHeraAction(list, heraJob);
                            idMap.put(heraJob.getId(), heraAction);
                            insertActionList.addAll(heraAction);
                        }
                    } else {
                        ErrorLog.error("任务{}未知的调度类型{}", heraJob.getId(), heraJob.getScheduleType());
                    }
    
                }
            }
            batchInsertList(insertActionList, actionMap, nowAction);
    
        }
    
    
    
    
    //jobHandler中的Events.UpdateActions事件处理
        private void autoRecovery() {
            cache.refresh();
            HeraActionVo heraActionVo = cache.getHeraActionVo();
            //任务被删除
            if (heraActionVo == null) {
                masterContext.getDispatcher().removeJobHandler(this);
                destroy();
                ScheduleLog.info("heraAction 为空, 删除{}", actionId);
                return;
            }
            //自动调度关闭
            if (!heraActionVo.getAuto()) {
                destroy();
                return;
            }
    
            /**
             * 如果是依赖任务 原来可能是独立任务,需要尝试删除原来的定时调度
             * 如果是独立任务,则重新创建quartz调度
             *
             */
            if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Dependent) {
                destroy();
            } else if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {
                try {
                    createScheduleJob(masterContext.getDispatcher(), heraActionVo);
                } catch (SchedulerException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
     /**
         * 创建定时任务
         *
         * @param dispatcher   the scheduler
         * @param heraActionVo the job name
         */
    
        private void createScheduleJob(Dispatcher dispatcher, HeraActionVo heraActionVo) throws SchedulerException {
            if (!ActionUtil.isCurrActionVersion(actionId)) {
                return;
            }
            JobKey jobKey = new JobKey(actionId, Constants.HERA_GROUP);
            if (masterContext.getQuartzSchedulerService().getScheduler().getJobDetail(jobKey) == null) {
                JobDetail jobDetail = JobBuilder.newJob(HeraQuartzJob.class).withIdentity(jobKey).build();
                jobDetail.getJobDataMap().put("actionId", heraActionVo.getId());
                jobDetail.getJobDataMap().put("dispatcher", dispatcher);
                //TODO  根据任务区域加时区
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(heraActionVo.getCronExpression().trim())/*.inTimeZone()*/;
                CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(actionId, Constants.HERA_GROUP).withSchedule(scheduleBuilder).build();
                masterContext.getQuartzSchedulerService().getScheduler().scheduleJob(jobDetail, trigger);
                ScheduleLog.info("--------------------------- 添加自动调度成功:{}--------------------------", heraActionVo.getId());
            }
        }
    1. 在生成版本号的时候根据cron表达式生成多个版本
    2. 发出多版本变更通知
    3. jobHandler处理给对应新版本创建对应定时任务

    依赖调度

           我们的任务大部分都有依赖关系,只有在上一个任务计算出结果后才能进行下一步的执行。我们的依赖任务会在所有的依赖任务都执行完成之后才会被触发加入任务队列

    /**
         * 递归生成任务依赖action
         *
         * @param jobMap    任务映射map
         * @param heraJob   当前生成版本的任务
         * @param actionMap 版本map
         * @param nowAction 生成版本时间的action
         * @param idMap     job的id集合  只要已经检测过的id都放入idSet中
         */
        private void generateDependJobAction(Map<Integer, HeraJob> jobMap, HeraJob heraJob, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap) {
            if (heraJob == null || idMap.containsKey(heraJob.getId())) {
                return;
            }
            String jobDependencies = heraJob.getDependencies();
            if (StringUtils.isNotBlank(jobDependencies)) {
                Map<String, List<HeraAction>> dependenciesMap = new HashMap<>(1024);
                String[] dependencies = jobDependencies.split(Constants.COMMA);
                String actionMinDeps = "";
                boolean noAction = false;
                for (String dependentId : dependencies) {
                    Integer dpId = Integer.parseInt(dependentId);
                    //如果idSet不包含依赖任务dpId  则递归查找
                    if (!idMap.containsKey(dpId)) {
                        generateDependJobAction(jobMap, jobMap.get(dpId), actionMap, nowAction, idMap);
                    }
                    List<HeraAction> dpActions = idMap.get(dpId);
                    dependenciesMap.put(dependentId, dpActions);
                    if (dpActions == null || dpActions.size() == 0) {
                        ErrorLog.warn("{}今天找不到版本,无法为任务{}生成版本", dependentId, heraJob.getId());
                        noAction = true;
                        break;
                    }
                    if (StringUtils.isBlank(actionMinDeps)) {
                        actionMinDeps = dependentId;
                    }
                    //找到所依赖的任务中版本最少的作为基准版本。
                    if (dependenciesMap.get(actionMinDeps).size() > dependenciesMap.get(dependentId).size()) {
                        actionMinDeps = dependentId;
                    } else if (dependenciesMap.get(dependentId).size() > 0 && dependenciesMap.get(actionMinDeps).size() == dependenciesMap.get(dependentId).size() &&
                            dependenciesMap.get(actionMinDeps).get(0).getId() < dependenciesMap.get(dependentId).get(0).getId()) {
                        //如果两个版本的个数一样  那么应该找一个时间较大的
                        actionMinDeps = dependentId;
                    }
                }
                if (noAction) {
                    idMap.put(heraJob.getId(), null);
                } else {
                    List<HeraAction> actionMinList = dependenciesMap.get(actionMinDeps);
                    if (actionMinList != null && actionMinList.size() > 0) {
                        List<HeraAction> insertList = new ArrayList<>();
                        for (HeraAction action : actionMinList) {
                            StringBuilder actionDependencies = new StringBuilder(action.getId().toString());
                            Long longActionId = Long.parseLong(actionDependencies.toString());
                            for (String dependency : dependencies) {
                                if (!dependency.equals(actionMinDeps)) {
                                    List<HeraAction> otherAction = dependenciesMap.get(dependency);
                                    if (otherAction == null || otherAction.size() == 0) {
                                        continue;
                                    }
                                    //找到一个离基准版本时间最近的action,添加为该任务的依赖
                                    String otherActionId = otherAction.get(0).getId().toString();
                                    for (HeraAction o : otherAction) {
                                        if (Math.abs(o.getId() - longActionId) < Math.abs(Long.parseLong(otherActionId) - longActionId)) {
                                            otherActionId = o.getId().toString();
                                        }
                                    }
                                    actionDependencies.append(",");
                                    actionDependencies.append(Long.parseLong(otherActionId) / 1000000 * 1000000 + Long.parseLong(dependency));
                                }
                            }
                            HeraAction actionNew = new HeraAction();
                            BeanUtils.copyProperties(heraJob, actionNew);
                            Long actionId = longActionId / 1000000 * 1000000 + Long.parseLong(String.valueOf(heraJob.getId()));
                            actionNew.setId(actionId);
                            actionNew.setGmtCreate(new Date());
                            actionNew.setDependencies(actionDependencies.toString());
                            actionNew.setJobDependencies(heraJob.getDependencies());
                            actionNew.setJobId(heraJob.getId());
                            actionNew.setAuto(heraJob.getAuto());
                            actionNew.setHostGroupId(heraJob.getHostGroupId());
                            masterContext.getHeraJobActionService().insert(actionNew, nowAction);
                            actionMap.put(actionNew.getId(), actionNew);
                            insertList.add(actionNew);
                        }
                        idMap.put(heraJob.getId(), insertList);
                    }
                }
            }
    
        }
    
    
      /**
         * 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent
         *
         * @param event
         */
        private void handleSuccessEvent(HeraJobSuccessEvent event) {
            if (event.getTriggerType() == TriggerTypeEnum.MANUAL) {
                return;
            }
            String jobId = event.getJobId();
            HeraActionVo heraActionVo = cache.getHeraActionVo();
            if (heraActionVo == null) {
                autoRecovery();
                return;
            }
            if (!heraActionVo.getAuto()) {
                return;
            }
            if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {
                return;
            }
            if (heraActionVo.getDependencies() == null || !heraActionVo.getDependencies().contains(jobId)) {
                return;
            }
            JobStatus jobStatus;
            //必须同步
            synchronized (this) {
                jobStatus = heraJobActionService.findJobStatus(actionId);
                ScheduleLog.info(actionId + "received a success dependency job with actionId = " + jobId);
                jobStatus.getReadyDependency().put(jobId, String.valueOf(System.currentTimeMillis()));
                heraJobActionService.updateStatus(jobStatus);
            }
            boolean allComplete = true;
            for (String key : heraActionVo.getDependencies()) {
                if (jobStatus.getReadyDependency().get(key) == null) {
                    allComplete = false;
                    break;
                }
            }
            if (allComplete) {
                ScheduleLog.info("JobId:" + actionId + " all dependency jobs is ready,run!");
                startNewJob(event.getTriggerType(), heraActionVo);
            } else {
                ScheduleLog.info(actionId + "some of dependency is not ready, waiting" + JSONObject.toJSONString(jobStatus.getReadyDependency().keySet()));
            }
        }
    1. generateDependJobAction算法比较复杂,其目的就是刷新下内存中 heraActionMap 中的依赖关系
    2.   当其依赖的独立任务完成时,jobHandler会收到消息,然后较验自己的所有依赖是否就绪然后发起任务命令

    手动调度与手动恢复

            手动调度即为手动执行的任务,手动执行后自动加入任务队列,请注意,手动任务执行成功后不会通知下游任务(即:依赖于该任务的任务)该任务已经执行完成。

         手动恢复类似于手动调度,于手动调度的区别为此时如果该任务执行成功,会通知下游任务该任务已经执行完成

      /**
         * 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent
         *
         * @param event
         */
        private void handleSuccessEvent(HeraJobSuccessEvent event) {
            if (event.getTriggerType() == TriggerTypeEnum.MANUAL) {
                return;
            }
            //省略代码
        }

    1.手动执行和debug执行流程相似,不做多介绍

    2.Jobhandler中handleSuccessEvent对手动的进行了过滤

     

    总结:

         关于抢占模式:

            基本上所有的调度工作都集中在master,通过权限控制可以很好的做个主备master,但备master会处于空闲。。

                     优化点:

                              master 处于单机运行模式,是否能提供集群方式运行

                                         如果集群关于work与master长链接维护的问题:

                                                 如果不是和dubbo一样每个master和work建立长链接,就为有调用不了的问题。。

                                                 如果work过多,对于master也是一种压力。。

                              master不建议设成work,是否可以改成 独占为master的work,不在接受工作调度

         关于机器选用:

                先去数据库(缓存)查询任务对应分组机子,与在线机子做差集。然后平均负载。因为缓存的关系,新加一台work要等待半小时。

                优化点:

                          作为work是不是应该自己所在的分组。。为什么不在启动的时候或者web设置的时候告诉work是属于什么分组的

        关于NIO通讯模型的使用

                   hera的设计于master与work的通讯基本上采用 master发出请求,然后等待,work处理返回, master做后置处理,调用事件模型dispatch,做通知。

                 优化点:

                       nio的特点是多路复用,netty框架的整个设计也是通知模式的。为什么采用同步等待+事件模型的方式。。 

    展开全文
  • 高精度的HERA数据可以搜索到TeV尺度,从而超出标准模型对电子夸克散射的贡献。 在此分析中,结合了对中性和带电电流ep散射中包含的深部非弹性截面的组合测量,其对应的光度约为1 fbb-1。 提出了一种超越标准模型对...
  • hera 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 Master#run 方法(开发中心任务触发...

    hera 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 Master#run 方法(开发中心任务触发接口在 Master#debug )。

    这里就讲一下手动执行的任务触发流程

    触发任务

    在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过 hera 操作文档查看,这里以手动恢复为例
    在这里插入图片描述
    当我们点击执行之后,会发送一个请求到后端

    work端

    首先看下 work 端的堆栈信息

    writeAndFlush:28, NettyChannel (com.dfire.core.netty)
    writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster)
    buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
    handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
    executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker)
    execute:409, ScheduleOperatorController (com.dfire.controller)
    invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller)
    invoke:204, MethodProxy (org.springframework.cglib.proxy)
    invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
    proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework)
    proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj)
    around:72, HeraAspect (com.dfire.config)
    //省略部分
    
    

    通过堆栈信息我们可以看到,在 controller 方法被调用之前会先调用一个通过 AOP 实现的权限校验的方法HeraAspect#around,当权限校验通过后会继续调用ScheduleOperatorController#execute 方法,该方法就是任务执行的入口。再往后就是调用 WorkerHandleWebRequest#handleWebExecuteWorkerHandleWebRequest#buildMessage 方法来创建 netty 消息体,最后通过一个快速失败的容错方式(FailFastCluster#writeAndFlush)来向 master 发送一条 netty 消息


    下面仔细分析下,controller 入口

        @RequestMapping(value = "/manual", method = RequestMethod.GET)
        @ResponseBody
        @ApiOperation("手动执行接口")
        public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId
                , @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType,
                                    @RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException {
      		//省略部分校验代码
            String configs = heraJob.getConfigs();
            //新建hera_action_history 对象,并向mysql插入执行记录
            HeraJobHistory actionHistory = HeraJobHistory.builder().build();
            actionHistory.setJobId(heraAction.getJobId());
            actionHistory.setActionId(heraAction.getId());
            actionHistory.setTriggerType(triggerTypeEnum.getId());
            actionHistory.setOperator(heraJob.getOwner());
            actionHistory.setIllustrate(execUser);
            actionHistory.setStatus(StatusEnum.RUNNING.toString());
            actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime());
            actionHistory.setHostGroupId(heraAction.getHostGroupId());
            heraJobHistoryService.insert(actionHistory);
            heraAction.setScript(heraJob.getScript());
            heraAction.setHistoryId(actionHistory.getId());
            heraAction.setConfigs(configs);
            heraAction.setAuto(heraJob.getAuto());
            heraAction.setHostGroupId(heraJob.getHostGroupId());
            heraJobActionService.update(heraAction);
            //向master 发送任务执行的请求
            workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId());
    
            String ownerId = getOwnerId();
            if (ownerId == null) {
                ownerId = "0";
            }
            //添加操作记录
            addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId);
            return new JsonResponse(true, String.valueOf(actionId));
        }
    

    这部分的代码很简单,主要分为三部分
    1.创建 hera_action_history 对象,向 mysql 插入任务的执行记录
    2.通过 nettymaster 发送任务执行的消息
    3.添加任务执行记录

    需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看

        public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException {
            RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
            if (response.getStatus() == ResponseStatus.Status.ERROR) {
                ErrorLog.error(response.getErrorText());
            }
        }
    

    这部分代码调用了 WorkerHandleWebRequest.handleWebExecute 并返回一个future,通过 future.get 来阻塞我们的请求,继续往下看

        public static Future<WebResponse> handleWebExecute(final WorkContext workContext, ExecuteKind kind, Long id) {
            return buildMessage(WebRequest.newBuilder()
                    .setRid(AtomicIncrease.getAndIncrement())
                    .setOperate(WebOperate.ExecuteJob)
                    .setEk(kind)
                    .setId(String.valueOf(id))
                    .build(), workContext, "[执行]-任务超出" + HeraGlobalEnv.getRequestTimeout() + "秒未得到master消息返回:" + id);
        }
         private static Future<WebResponse> buildMessage(WebRequest request, WorkContext workContext, String errorMsg) {
            CountDownLatch latch = new CountDownLatch(1);
            WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);
            workContext.getHandler().addListener(responseListener);
            Future<WebResponse> future = workContext.getWorkWebThreadPool().submit(() -> {
                latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);
                if (!responseListener.getReceiveResult()) {
                    ErrorLog.error(errorMsg);
                }
                workContext.getHandler().removeListener(responseListener);
                return responseListener.getWebResponse();
            });
            try {
                workContext.getServerChannel().writeAndFlush(SocketMessage.newBuilder()
                        .setKind(SocketMessage.Kind.WEB_REQUEST)
                        .setBody(request.toByteString())
                        .build());
                SocketLog.info("1.WorkerHandleWebRequest: send web request to master requestId ={}", request.getRid());
            } catch (RemotingException e) {
                workContext.getHandler().removeListener(responseListener);
                ErrorLog.error("1.WorkerHandleWebRequest: send web request to master exception requestId =" + request.getRid(), e);
            }
            return future;
        }
    

    handleWebExecute 方法中,新建了一个 WebRequest 对象,需要注意的是该对象的 operator 参数为 WebOperate.ExecuteJobidhera_action_history 记录的 id
    然后在 buildMessage 方法中有三个比较关键的代码
    1.CountDownLatch latch = new CountDownLatch(1); 该锁会在一个线程池的异步操作中等待,并且会在WorkResponseListener 中被释放。
    2.WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);

    public class WorkResponseListener extends ResponseListenerAdapter {
    
    
        private RpcWebRequest.WebRequest request;
        private volatile Boolean receiveResult;
        private CountDownLatch latch;
        private RpcWebResponse.WebResponse webResponse;
    
        @Override
        public void onWebResponse(RpcWebResponse.WebResponse response) {
            if (request.getRid() == response.getRid()) {
                try {
                    webResponse = response;
                    receiveResult = true;
                } catch (Exception e) {
                    ErrorLog.error("work release exception {}", e);
                } finally {
                    latch.countDown();
                }
            }
        }
    }
    

    onWebResponse 方法中,当发现request.getRid() == response.getRid()时会释放锁,并标志 receiveResulttrue
    3.调用 workContext.getServerChannel().writeAndFlush 方法来向master发送任务执行的消息,在上篇hera源码剖析:项目启动之分布式锁 已经说过 workContext 是什么时候设置的 serverChannel

    master端

    master 接收所有 netty 消息的处理类为 MasterHandler,也就是说上面work 发送的执行任务请求最终会在MasterHandler#channelRead被调用

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            SocketMessage socketMessage = (SocketMessage) msg;
            Channel channel = ctx.channel();
            switch (socketMessage.getKind()) {
    			//省略部分代码
                case WEB_REQUEST:
                    final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();
                    switch (webRequest.getOperate()) {
                        case ExecuteJob:
                            completionService.submit(() ->
                                    new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest)));
                            break;
                            //省略部分代码
                    }
                //省略部分代码
            }
    
        }
    

    MasterHandler 直接把 work 的任务执行请求异步分发给 MasterHandlerWebResponse.handleWebExecute 来处理,并且返回了一个失败重试封装的 channel

      public static WebResponse handleWebExecute(MasterContext context, WebRequest request) {
            if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind) {
                Long historyId = Long.parseLong(request.getId());
                HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId);
                HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory);
                context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId()));
                WebResponse webResponse = WebResponse.newBuilder()
                        .setRid(request.getRid())
                        .setOperate(WebOperate.ExecuteJob)
                        .setStatus(Status.OK)
                        .build();
                TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId = {} ", history.getJobId());
                return webResponse;
            } else if (request.getEk() == ExecuteKind.DebugKind) {
                Long debugId = Long.parseLong(request.getId());
                HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId);
                TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId);
                context.getMaster().debug(debugHistory);
    
                WebResponse webResponse = WebResponse.newBuilder()
                        .setRid(request.getRid())
                        .setOperate(WebOperate.ExecuteJob)
                        .setStatus(Status.OK)
                        .build();
                TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = {}", debugId);
                return webResponse;
            }
            return WebResponse.newBuilder()
                    .setRid(request.getRid())
                    .setErrorText("未识别的操作类型" + request.getEk())
                    .setStatus(Status.ERROR)
                    .build();
        }
    
    

    在这里主要是根据request.getEk() 来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:ExecuteKind.ManualKind,直接看 if 部分代码。

    • 首先根据 hera_action_historyid 来查询在 work 端插入的那条记录

    • 调用 master#run 方法

    • 创建 webResponse 对象,返回执行任务 ok 的标志

    run方法

     public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob) {
            Long actionId = heraJobHistory.getActionId();
            //重复job检测
            //1:检测任务是否已经在队列或者正在执行
            if (checkJobExists(heraJobHistory, false)) {
                return;
            }
            HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId);
            Set<String> areaList = areaList(heraJob.getAreaId());
            //2:非执行区域任务直接设置为成功
            if (!areaList.contains(HeraGlobalEnv.getArea()) && !areaList.contains(Constants.ALL_AREA)) {
                ScheduleLog.info("非{}区域任务,直接设置为成功:{}", HeraGlobalEnv.getArea(), heraJob.getId());
                heraAction.setLastResult(heraAction.getStatus());
                heraAction.setStatus(StatusEnum.SUCCESS.toString());
                heraAction.setHistoryId(heraJobHistory.getId());
                heraAction.setReadyDependency("{}");
                String host = "localhost";
                heraAction.setHost(host);
                Date endTime = new Date();
                heraAction.setStatisticStartTime(endTime);
                heraAction.setStatisticEndTime(endTime);
                masterContext.getHeraJobActionService().update(heraAction);
                heraJobHistory.getLog().append("非" + HeraGlobalEnv.getArea() + "区域任务,直接设置为成功");
                heraJobHistory.setStatusEnum(StatusEnum.SUCCESS);
                heraJobHistory.setEndTime(endTime);
                heraJobHistory.setStartTime(endTime);
                heraJobHistory.setExecuteHost(host);
                masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));
                HeraJobSuccessEvent successEvent = new HeraJobSuccessEvent(actionId, heraJobHistory.getTriggerType(), heraJobHistory);
                masterContext.getDispatcher().forwardEvent(successEvent);
                return;
            }
    
            //3.先在数据库中set一些执行任务所需的必须值 然后再加入任务队列
            heraAction.setLastResult(heraAction.getStatus());
            heraAction.setStatus(StatusEnum.RUNNING.toString());
            heraAction.setHistoryId(heraJobHistory.getId());
            heraAction.setStatisticStartTime(new Date());
            heraAction.setStatisticEndTime(null);
            masterContext.getHeraJobActionService().update(heraAction);
            heraJobHistory.getLog().append(ActionUtil.getTodayString() + " 进入任务队列");
            masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));
    
    
            boolean isFixed;
            int priorityLevel = 3;
            Map<String, String> configs = StringUtil.convertStringToMap(heraAction.getConfigs());
            String priorityLevelValue = configs.get("run.priority.level");
            if (priorityLevelValue != null) {
                priorityLevel = Integer.parseInt(priorityLevelValue);
            }
            String areaFixed = HeraGlobalEnv.getArea() + Constants.POINT + Constants.HERA_EMR_FIXED;
            if (configs.containsKey(Constants.HERA_EMR_FIXED) || configs.containsKey(areaFixed)) {
                isFixed = Boolean.parseBoolean(configs.get(areaFixed)) || Boolean.parseBoolean(configs.get(Constants.HERA_EMR_FIXED));
            } else {
                isFixed = Boolean.parseBoolean(getInheritVal(heraAction.getGroupId(), areaFixed, Constants.HERA_EMR_FIXED));
            }
            Integer endMinute = masterContext.getHeraJobService().findMustEndMinute(heraAction.getJobId());
            //4.组装JobElement
            JobElement element = JobElement.builder()
                    .jobId(heraJobHistory.getActionId())
                    .hostGroupId(heraJobHistory.getHostGroupId())
                    .priorityLevel(priorityLevel)
                    .historyId(heraJobHistory.getId())
                    .fixedEmr(isFixed)
                    .owner(heraJob.getOwner())
                    .costMinute(endMinute)
                    .build();
            try {
                element.setTriggerType(heraJobHistory.getTriggerType());
                HeraAction cacheAction = heraActionMap.get(element.getJobId());
                if (cacheAction != null) {
                    cacheAction.setStatus(StatusEnum.RUNNING.toString());
                }
                //5.放入任务扫描队列
                switch (heraJobHistory.getTriggerType()) {
                    case MANUAL:
                        masterContext.getManualQueue().put(element);
                        break;
                    case AUTO_RERUN:
                        masterContext.getRerunQueue().put(element);
                        break;
                    case MANUAL_RECOVER:
                    case SCHEDULE:
                        masterContext.getScheduleQueue().put(element);
                        break;
                    case SUPER_RECOVER:
                        masterContext.getSuperRecovery().put(element);
                        break;
                    default:
                        ErrorLog.error("不支持的调度类型:{},id:{}", heraJobHistory.getTriggerType().toName(), heraJobHistory.getActionId());
                        break;
                }
            } catch (InterruptedException e) {
                ErrorLog.error("添加任务" + element.getJobId() + "失败", e);
            }
        }
    

    run 方法的主要功能是将要执行的任务根据类型放到不同的队列。
    由于代码较多分段分析

    1. checkJobExists 方法检测任务是否已经在队列或者正在执行,如果是允许重复执行任务或者任务重跑触发的任务不会进行检测
    2. 对于非执行区域任务直接设置为成功并且广播通知下游任务,该参数由application.yml中的hera.area 配置决定。另外,如果区域设置为 all,则所有区域都能执行。
    3. 在数据库中 set 一些执行任务所需的必须值 然后再加入任务队列
    4. 组装 JobElement,该对象最终会被放到执行队列中。主要参数有:costMinute(任务的预计最大执行分钟数)、jobId(任务的执行实例id)、hostGroupId(任务的执行机器组)、priorityLevel(任务的有限级别)、historyId(该任务对应的执行记录 id)、fixedEmr 是否在固定集群执行、owner任务的创建人所在组
    5. 将任务根据不同的触发类型,放入不同的任务扫描队列,等待 master 的扫描线程扫描
    展开全文
  • 我们展示了一组具有正特征值ω的特征函数,以及来自具有负ω特征函数连续性的一小部分,从而很好地描述了该区域x <0> 6 GeV2的高精度HERA F2数据 。 本征函数的相位可以从波美隆谱的简单参数化获得,这在BFKL中具有...
  • 已使用ZEUS探测器在HERA使用372 pb -1的综合光度测量了在衍射深非弹性e±p散射中的独家双射流的生产。 该测量是针对90°<W> 25GeV2进行的。 显示了绕射流轴的能量流。 横截面表示为β和Ï的函数,其中β= x / xIP,x...
  • 我们调查了在MMHT2014 PDF上包含HERA运行I + II组合横截面数据的影响。 当仅包含HERA数据时,我们会在整体拟合的背景下显示拟合质量。 我们检查了PDF的中心值和不确定性的变化。 我们发现,对数据的预测是好的,并且...
  • 赫拉(hera)分布式任务调度系统之架构,基本功能(一) 赫拉(hera)分布式任务调度系统之项目启动(二) 赫拉(hera)分布式任务调度系统之开发中心(三) 版本介绍 在hera系统中支持历史版本的数据重跑。 每一个任务都会生成...

    赫拉

    大数据平台,随着业务发展,每天承载着成千上万的ETL任务调度,这些任务集中在hive,shell脚本调度。怎么样让大量的ETL任务准确的完成调度而不出现问题,甚至在任务调度执行中出现错误的情况下,任务能够完成自我恢复甚至执行错误告警与完整的日志查询。hera任务调度系统就是在这种背景下衍生的一款分布式调度系统。随着hera集群动态扩展,可以承载成千上万的任务调度。它是一款原生的分布式任务调度,可以快速的添加部署wokrer节点,动态扩展集群规模。支持shell,hive,spark脚本调度,可以动态的扩展支持python等服务器端脚本调度。

    项目地址:git@gitee.com:dfire/hera.git

    赫拉(hera)分布式任务调度系统之架构,基本功能(一)

    赫拉(hera)分布式任务调度系统之项目启动(二)

    赫拉(hera)分布式任务调度系统之开发中心(三)

    赫拉(hera)分布式任务调度系统之版本(四)

    赫拉(hera)分布式任务调度系统之Q&A(五)

    版本介绍

    hera系统中支持历史版本的数据重跑。

    每一个任务都会生成版本,版本时间根据cron表达式来产生。其中版本在脚本中没有使用内置变量时无用。

    版本号的生成规则为:yyyyMMddHHmm000000 + 任务ID号 ,其实日期的替换就是根据版本的前12位来识别的。

    版本的使用

    在hera中我们内置了一些时间变量。其原理根据velocity模版进行变量替换。所有的时间变量都在HeraDateTool类中,有需要自定义需求的可以自行增加。

    比如我写了一个脚本,内容如下

    在这里插入图片描述

    点击手动执行,执行时选择一个版本。
    在这里插入图片描述

    通过执行日志信息我们可以发现使用的版本为:201812290300000001 然后执行的时间为2019年1月4日,脚本输出的时间是以脚本时间版本时间为准的。通过选择不同时间的版本,我们可以在跑数据时使用hera内置时间变量,来进行拉取指定的时间的数据。
    在这里插入图片描述

    例子同上。

    加入群聊

    在这里插入图片描述

    个人微信(失效加我拉你进去)
    在这里插入图片描述

    展开全文
  • 最近发现我总是站在我的角度来使用hera,每个功能都很清楚,但是对于使用者,他们是不清楚的,所以提供一篇hera操作文档。有问题可以在下面回复 操作文档 登录、注册 在hera上登录和注册其实分为两个部分,即用户和...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 406
精华内容 162
关键字:

hera