精华内容
下载资源
问答
  • DevOps构建定义与任务编排
  • * 异步编排测试,多任务编排时使用 * @author 86156 */ public class AsynTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1、异步执行,没有返回结果 ...

    贴下异步编排随手写的测试代码,仅用于记录。 

    /**
     * 异步编排测试,多任务编排时使用
     * @author 86156
     */
    public class AsynTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1、异步执行,没有返回结果
            CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(new Runnable01());
            // 2、异步执行,指定线程池执行,没有返回结果
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(new Runnable01(), executorService);
    
            // 3、异步执行,获取返回结果
            FutureTask<String> futureTask = new FutureTask<>(new Callable01());
            CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
                System.out.println("异步执行任务,带返回值");
                return "success";
            });
            System.out.println(completableFuture3.get()); // success
    
            // 4、异步执行,指定线程池执行,获取返回结果
            CompletableFuture<String> completableFuture4 = CompletableFuture.supplyAsync(() -> {
                System.out.println("异步执行任务,指定线程池,带返回值");
                return "success";
            });
            System.out.println(completableFuture4.get()); // success
    
            // 关闭线程池
            executorService.shutdown();
    
            // 任务完成时执行逻辑:whenCompleteAsync,异常处理exceptionally
            CompletableFuture.supplyAsync(() -> {
                System.out.println("异步执行任务,执行完成继续执行其他逻辑");
                return "success";
            }).whenCompleteAsync((result, exception) -> {
                if (exception == null) {
                    int i = 10 / 0;
                    System.out.println("上一步执行任务结果是:" + result);
                }
            }).exceptionally(e -> {
                System.out.println("打印异常" + e.getMessage());
                return e.getMessage();
            });
    
            // 任务完成时执行逻辑:handle
            CompletableFuture.supplyAsync(() -> {
                System.out.println("异步执行任务,,,");
                return "hello";
            }).handle((res, err) -> {
                if (err == null) {
                    System.out.println(res + " Tom!");
                }
                return 0;
            });
    
            // 串行化执行任务:thenApply,thenRun,thenAccept
    //        CompletableFuture.supplyAsync(() -> {
    //            System.out.println("异步执行任务,,,");
    //            return "hello";
    //        }).thenAccept(res -> System.out.println("执行完会没有返回结果,打印执行结果:" + res));
    //        CompletableFuture.supplyAsync(() -> {
    //            System.out.println("异步执行任务,,,");
    //            return "hello";
    //        }).thenApply(res -> {
    //            System.out.println("执行完会有返回结果,打印执行结果:" + res);
    //            return  0;
    //        });
    //        CompletableFuture.supplyAsync(() -> {
    //            System.out.println("异步执行任务,,,");
    //            return "hello";
    //        }).thenRun(() -> {
    //            System.out.println("thenRun无法获取上个任务的结果");
    //        });
    
            // 两个任务都要完成
            completableFuture1.thenCombineAsync(completableFuture2, (f1, f2) -> {
                System.out.println(f1 + ":" + f2 + "ret");
                return 0;
            });
    
            // 两个任务任何一个完成
            completableFuture1.runAfterEitherAsync(completableFuture2, () -> {
                System.out.println("任何一个任务执行成功都执行这里。。。。。。。");
            });
    
            // 所有任何都要执行
            CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).handle((res, err) ->
                {
                    if (err == null) {
                        System.out.println("成功执行所有任务");
                    }
                    return 0;
                }
            );
    
            // 任何一个任何执行
            CompletableFuture.anyOf(completableFuture1, completableFuture2, completableFuture3).handle((res, err) ->
                {
                    if (err == null) {
                        System.out.println("任何一个任何执行");
                    }
                    return 0;
                }
            );
    
        }
    
        public static class Runnable01 implements Runnable {
            @Override
            public void run() {
                System.out.println("异步执行线程,线程id:" + Thread.currentThread().getId());
            }
        }
    
        public static class Callable01 implements Callable<String> {
            @Override
            public String call() throws Exception {
                System.out.println("callable方式执行线程");
                return "success";
            }
        }
    
    }

     

    展开全文
  • 最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。任务编排工作流任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的...

    最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。

    任务编排工作流

    任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后就能形成一个 workflow 工作流了。我们希望这个工作流按照我们编排的方式去执行每个原子 task 任务。如下图所示,我们希望先并发运行 Task A 和 Task C,Task A 执行完后串行运行 Task B,在并发等待 Task B 和 C 都结束后运行 Task D,这样就完成了一个典型的任务编排工作流。

    DAG 有向无环图

    首先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成一个环的成为有环图,反之称为无环图。显然运用在我们任务编排工作流上,最合适的是 DAG 有向无环图。

    我们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。

    下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。

    此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。

    一般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更方便点。

    一个任务编排框架

    了解了 DAG 的基本知识后我们可以来简单实现一下。

    了解JUC包的可能快速想到CompletableFuture,这个类对于多个并发线程有复杂关系耦合的场景是很适用的,如果是一次性任务,那么使用CompletableFuture完全没有问题。但是作为框架或者平台来说,我们还需要考虑存储节点状态、重试执行等逻辑,对于这些CompletableFuture是不能满足的。

    我们需要更完整地考虑与设计这个框架。首先是存储结构,我们的 Dag 表示一整个图,Node 表示各个顶点,每个顶点有其 parents 和 children:

    //Dag
    public final class DefaultDag<T, R> implements Dag<T, R> {
    
    
      private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
        ...
    }
    
    //Node
    public final class Node<T, R> {
      /**
       * incoming dependencies for this node
       */
        private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
        /**
         * outgoing dependencies for this node
         */
        private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
        ...
    }
    

    画两个顶点,以及为这两个顶点连边操作如下:

    public void addDependency(final T evalFirstNode, final T evalLaterNode) {
      Node<T, R> firstNode = createNode(evalFirstNode);
      Node<T, R> afterNode = createNode(evalLaterNode);
    
    
      addEdges(firstNode, afterNode);
    }
    
    
       
    private Node<T, R> createNode(final T value) {
      Node<T, R> node = new Node<T, R>(value);
      return node;
    }
    private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
      if (!firstNode.equals(afterNode)) {
        firstNode.getChildren().add(afterNode);
        afterNode.getParents().add(firstNode);
      }
    }
    

    到现在我们其实已经把基础数据结构写好了,但我们作为一个任务编排框架最终是需要线程去执行的,我们把它和线程池一起给包装一下。

    //任务编排线程池
    public class DefaultDexecutor <T, R> {
    
    
        //执行线程,和2种重试线程
      private final ExecutorService<T, R> executionEngine;
      private final ExecutorService immediatelyRetryExecutor;
      private final ScheduledExecutorService scheduledRetryExecutor;
        //执行状态
      private final ExecutorState<T, R> state;
        ...
    }
    //执行状态
    public class DefaultExecutorState<T, R> {
        //底层图数据结构
      private final Dag<T, R> graph;
        //已完成
      private final Collection<Node<T, R>> processedNodes;
        //未完成
      private final Collection<Node<T, R>> unProcessedNodes;
        //错误task
      private final Collection<ExecutionResult<T, R>> erroredTasks;
        //执行结果
      private final Collection<ExecutionResult<T, R>> executionResults;
    }
    

    可以看到我们的线程包括执行线程池,2 种重试线程池。我们使用 ExecutorState 来保存一些整个任务工作流执行过程中的一些状态记录,包括已完成和未完成的 task,每个 task 执行的结果等。同时它也依赖我们底层的图数据结构 DAG。

    接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执行就可以了,过程中注意一些节点状态的保持,结果的保存即可。

    还是以上图为例,值得说的一点是在 Task D 这个点需要有一个并发等待的操作,即 Task D 需要依赖 Task B 和 Task C 执行结束后再往下执行。这里有很多办法,我选择了共享变量的方式来完成并发等待。遍历工作流中被递归的方法的伪代码如下:

    private void doProcessNodes(final Set<Node<T, R>> nodes) {
        for (Node<T, R> node : nodes) {
            //共享变量 并发等待
            if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
                Task<T, R> task = newTask(node);
                this.executionEngine.submit(task);
                ...
                ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
                if (executionResult.isSuccess()) {
            state.markProcessingDone(processedNode);
          }
                //继续执行孩子节点
          doExecute(processedNode.getChildren());
                ...
            }
        }
    }
    

    这样我们基本完成了这个任务编排框架的工作,现在我们可以如下来进行示例图中的任务编排以及执行:

    DefaultExecutor<String, String> executor = newTaskExecutor();
    executor.addDependency("A", "B");
    executor.addDependency("B", "D");
    executor.addDependency("C", "D");
    executor.execute();
    

    任务编排平台化

    好了现在我们已经有一款任务编排框架了,但很多时候我们想要可视化、平台化,让使用者更加无脑。

    框架与平台最大的区别在哪里?是可拖拽的可视化输入么?我觉得这个的复杂度更多在前端。而对于后端平台来讲,与框架最大的区别是数据的持久化。

    对于 DAG 的顶点来说,我们需要将每个节点 Task 的信息给持久化到关系数据库中,包括 Task 的状态、输出结果等。而对于 DAG 的边来说,我们也得用数据库来存储各 Task 之间的方向关系。此外,在遍历执行 DAG 的整个过程中的中间状态数据,我们也得搬运到数据库中。

    首先我们可以设计一个 workflow 表,来表示一个工作流。接着我们设计一个 task 表,来表示一个执行单元。task 表主要字段如下,这里主要是 task_parents 的设计,它是一个 string,存储 parents 的 taskId,多个由分隔符分隔。

    task_id
    workflow_id
    task_name
    task_status
    result
    task_parents
    


    依赖是上图这个例子,对比框架来说,我们首先得将其存储到数据库中去,最终可能得到如下数据:

    task_id  workflow_id  task_name  task_status  result  task_parents
      1          1           A           0                    -1
      2          1           B           0                    1
      3          1           C           0                    -1
      4          1           D           0                    2,3
    

    可以看到,这样也能很好地存储 DAG 数据,和框架中代码的输入方式差别并不是很大。

    接下来我们要做的是遍历执行整个 workflow,这边和框架的差别也不大。首先我们可以利用select * from task where workflow_id = 1 and task_parents = -1来获取初始化节点 Task A 和 Task C,将其提交到我们的线程池中。

    接着对应框架代码中的doExecute(processedNode.getChildren());,我们使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子节点 Task D,这里使用了模糊查询是因为我们的 task_parents 可能是由多个父亲的 taskId 与分隔号组合而成的字符串。查询到孩子节点后,继续提交到线程池即可。

    别忘了我们在 Task D 这边还有一个并发等待的操作,对应框架代码中的

    if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。这边我们只要判断select count(1) from task where task_id in (2,3) and status != 1的个数为 0 即可,
    

    即保证 parents task 全部成功。

    另外值得注意的是 task 的重试。在框架中,失败 task 的重试可以是立即使用当前线程重试或者放到一个定时线程池中去重试。而在平台上,我们的重试基本上来自于用户在界面上的点击,即主线程。

    至此,我们已经将任务编排框架的功能基本平台化了。作为一个任务编排平台,可拖拽编排的可视化输入、整个工作流状态的可视化展示、任务的可人工重试都是其优点。

    出处:https://fredal.xin/task-scheduling-based-on-dag

    展开全文
  • 原本在任务计划中执行的一个任务,迁移到了云数据库的任务编排中。任务的主要目的是对当日无效的业务数据,状态更新为已过期。客户反映,最近一段时间,一直有应该过期没有过期的数据,影响了新业务的录入。 2.解决...

    1.奇怪的问题

    前段时间,将一个本地版sqlServer2008的数据库,迁移到了阿里云RDS for SqlServer2008上。原本在任务计划中执行的一个任务,迁移到了云数据库的任务编排中。任务的主要目的是对当日无效的业务数据,状态更新为已过期。客户反映,最近一段时间,一直有应该过期没有过期的数据,影响了新业务的录入。

    2.解决问题思路

    根据客户反映的问题,我列举如下两个原因:

    1、任务编排中的代码没有执行;

    2、任务按时执行了,因为业务复杂,其他业务将历史数据修改了状态。

    为此,我做了如下工作排错,

    • 我首先检查了任务编排中的代码,将代码在sql中执行,发现没有问题;
    • 我让同事检查了和状态相关可能的所有业务,还是没有问题;
    • 根据对数据的查看,每天大部分需要处理的数据都是处理了的,只有少部分数据没有处理,说明肯定有处理状态的代码;
    • 我在思考是不是任务编排每次都只能执行固定条数的代码,为此,专门做了测试,结果排除了这种猜测;
    • 检查所有的代码,工作量太大了,我设计了一个思路,修改任务编排中的代码,在执行前后,将需要处理状态的数据,写到一个表里,做好了任务,等待晚上的执行结果。结果第二天早上,发现自己的代码没有按计划执行,确实又有处理状态的代码执行了。

    遇到这种陷入瓶颈的问题,我告诫自己,答案在现场,于是,我打算晚上12:00,看看到底代码执行了没有。

    3.半夜追踪

    晚上12:00起来,看了一下执行计划,已经执行过了,能查到执行记录;又查了一下数据,确实有部分数据没有处理完。

    我又重新梳理了一下功能,当看到“发布”、“下线”字样时,我突然想到,是不是需要重新发布?任务计划确实是执行了,执行的是内存中的,脚本中看到的,并没有执行。

    按照这个思路,我重新发布,重新执行了一下计划,果然,数据按照设计的逻辑执行了。

    4.踩坑落幕

    至此,在Rds任务编排中踩到的坑,才算踩平。这就是没有认真读操作说明的弊端,也证明了自己常说的那句话,答案就在现场,遇到问题,在现场找答案。

    展开全文
  • 文章目录部署任务编排组件Orca1. 准备镜像2. 准备资源配置清单3. 应用资源配置清单4. 检测验证 部署任务编排组件Orca 1. 准备镜像 [root@k8s7-200.host.com ~]# docker pull armory/spinnaker-orca-slim:release-1.8...

    部署任务编排组件Orca

    1. 准备镜像

    [root@k8s7-200.host.com ~]# docker pull armory/spinnaker-orca-slim:release-1.8.x-de4ab55
    [root@k8s7-200.host.com ~]# docker tag 5103b1f73e04 harbor.od.com/armory/orca:v1.8.x
    [root@k8s7-200.host.com ~]# docker push harbor.od.com/armory/orca:v1.8.x
    

    2. 准备资源配置清单

    [root@k8s7-200.host.com /data/k8s-yaml/armory/orca]# cat deployment.yaml 
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      labels:
        app: armory-orca
      name: armory-orca
      namespace: armory
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: armory-orca
      template:
        metadata:
          annotations:
            artifact.spinnaker.io/location: '"armory"'
            artifact.spinnaker.io/name: '"armory-orca"'
            artifact.spinnaker.io/type: '"kubernetes/deployment"'
            moniker.spinnaker.io/application: '"armory"'
            moniker.spinnaker.io/cluster: '"orca"'
          labels:
            app: armory-orca
        spec:
          containers:
          - name: armory-orca
            image: harbor.od.com/armory/orca:v1.8.x
            command:
            - bash
            - -c
            args:
            - bash /opt/spinnaker/config/default/fetch.sh && cd /home/spinnaker/config
              && /opt/orca/bin/orca
            ports:
            - containerPort: 8083
              protocol: TCP
            env:
            - name: JAVA_OPTS
              value: -Xmx512M
            envFrom:
            - configMapRef:
                name: init-env
            livenessProbe:
              failureThreshold: 5
              httpGet:
                path: /health
                port: 8083
                scheme: HTTP
              initialDelaySeconds: 600
              periodSeconds: 5
              successThreshold: 1
              timeoutSeconds: 1
            readinessProbe:
              failureThreshold: 3
              httpGet:
                path: /health
                port: 8083
                scheme: HTTP
              initialDelaySeconds: 180
              periodSeconds: 3
              successThreshold: 5
              timeoutSeconds: 1
            volumeMounts:
            - mountPath: /etc/podinfo
              name: podinfo
            - mountPath: /opt/spinnaker/config/default
              name: default-config
            - mountPath: /opt/spinnaker/config/custom
              name: custom-config
          volumes:
          - configMap:
              defaultMode: 420
              name: custom-config
            name: custom-config
          - configMap:
              defaultMode: 420
              name: default-config
            name: default-config
          - downwardAPI:
              defaultMode: 420
              items:
              - fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.labels
                path: labels
              - fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.annotations
                path: annotations
            name: podinfo
    
    [root@k8s7-200.host.com /data/k8s-yaml/armory/orca]# cat service.yaml 
    apiVersion: v1
    kind: Service
    metadata:
      name: armory-orca
      namespace: armory
    spec:
      ports:
      - port: 8083
        protocol: TCP
        targetPort: 8083
      selector:
        app: armory-orca
    
    

    3. 应用资源配置清单

    [root@k8s7-200.host.com /data/k8s-yaml/armory/orca]# kubectl apply -f ./deployment.yaml 
    deployment.apps/armory-orca created
    [root@k8s7-200.host.com /data/k8s-yaml/armory/orca]# kubectl apply -f ./service.yaml 
    service/armory-orca created
    

    4. 检测验证

    在这里插入图片描述

    展开全文
  • 任务编排 由Yaml配置文件驱动的简单任务编排框架 安装 将此行添加到您的应用程序的Gemfile中: gem 'task-orchestrator' 然后执行: $ bundle 或将其自己安装为: $ gem install task-orchestrator 用法 $ ...
  • 最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。任务编排工作流任务编排是什么意思呢,顾名思义就是可以把 "任务" 这个原子单位按照...
  • 刚接触阿里云数据管理DMS中的任务编排,新增任务流程后,往往找不到调度属性,即如何设置任务定期执行。偶尔出险调度属性,却不知道如何主动去设置调度周期。 于是,对界面功能摸索尝试,记录下来。 2.调度属性...
  • 行业分类-物理装置-一种任务编排方法、装置、设备及存储介质
  • 异步编排任务的创建及基本使用 /** * 1:异步编排任务的创建及基本使用 * 使用线程池,异步执行任务。并且可以获取到任务的返回值。并且对任务的结果进行二次加工。 */ public class CompletableFutureTest { ...
  • 分布式任务编排调度框架设计 点击打开链接
  • DevOps构建定义与任务编排
  • 任务编排工具和工作流程 最近,用于编排任务和数据工作流的新工具激增(有时称为“MLOps”)。这些工具的数量众多,使得选择正确的工具成为一个难题,因此我们决定将一些最受欢迎的工具进行对比。 总体而言,Apache...
  • Nomad 是一个灵活的任务编排工具,使用户能部署和管理任何容器化的和传统的应用 Nomad能infrastructure-as-code的部署应用,将bin放入job, 可以优化资源利用率。 支持macOS, windows, 和linux. 2. 特性 Deploy ...
  • 1.任务编排介绍 数据库是企业IT系统里的重要基础设施,里面存储了大量有价值的数据资产,如:交易数据、客户数据、订单数据,等等。其实,数据库在企业里一直扮演着一个数据生产者(Producer)的角色,日积月累这些...
  • 1.任务编排介绍 数据库是企业IT系统里的重要基础设施,里面存储了大量有价值的数据资产,如:交易数据、客户数据、订单数据,等等。其实,数据库在企业里一直扮演着一个数据生产者(Producer)的角色,日积月累这些...
  • 它是一个分布式且容错的调度程序,它在许多作业存储(etcd / consul)的顶部运行,可用于任务编排。 特征 支持ISO8061 统计资料 工作经历 Docker支持 可配置的重试策略 容错能力 与golang应用集成 失火指示 ...
  • I.内容提要 定时调度系统(定时任务、定时执行)算是工作中经常依赖的中间件系统,简单使用操作系统的 crontab,或基于 Quartz,xxl-job 来...今天我们探讨另一话题,对调度任务的依赖关系及编排展开分析,实现一套.
  • 如果大家仔细看了上一篇文章,可以看到该框架的难点和重点,主要有两点,分别是任务的顺序编排任务结果的回调。 如何做任务顺序编排 依次来看一下各个基本场景 1 全串行 这种是最简单的,依次串行即可。 ...
  • 标准运维(SOPS)是通过可视化的图形界面进行任务流程编排和执行的系统,是腾讯蓝鲸产品体系中一款轻量级的调度编排类SaaS产品。标准运维有两大核心服务。一个是流程编排服务:基于腾讯蓝鲸PaaS平台的API网关服务,...
  • 分布式任务编排调度框架设计

    千次阅读 2018-09-18 16:53:18
    原本一个人可以轻松维护十几台甚至几十台服务器:写几个常用的监控和配置下发脚本、或者利用cronTab制作几个定时任务就可以搞定。当服务器的数量由几十上升到几百,几千时,量变就引起了质变;而且随着应用数量的...
  • 并发编程中我们经常创建异步线程来执行任务。但是,当异步任务之间存在依赖关系时,使得我们开发过程变得更加复杂。比如: 1、线程2依赖于线程1的执行结果 2、线程3依赖于线程1和线程2执行结果的合并 要实现以上...
  • OpenSA运维自动化平台 架构说明 Django 2.1 + MySQL 5.7 + Redis 5.0 +芹菜v4.2.0 生产环境请使用nginx + uwsgi,不对公网开放,或者使用SSL双向认证 命令和文件分发基于SSH协议,支持Linux / Windows(cygwin)|...
  • 任务编排的应用场景 任务类型 任务编排并不局限于HSF任务,由于框架仅要求传入的是一个函数,通过函数进行抽象,可以支持任意类型的任务编排,例如:HSF、MetaQ、Tair、DB等。 任务编排形式 下单页的例子是串行&...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 33,900
精华内容 13,560
关键字:

任务编排