精华内容
下载资源
问答
  • 任务编排
    千次阅读 多人点赞
    2021-11-28 16:08:32

    背景

    先说说写这个系列的目的吧,有认识我的朋友知道,我在杭州某不知名电商工作,负责数据平台方向的开发,虽然我之前一直分享的都是某些组件的用法,但我的本职工作还是围绕着各个组件去打造去中心化的数据能力。我们团队从去年开始,一直在朝着这个方向努力;经过一年的积累,我们也做出了部分的产品,如元数据系统、开发平台、调度中心、数据交换等等;但随着我们的业务在不断的扩张与发展,我们平台的用户逐渐的增加,最先扛不住的不是我们的业务系统、不是我们自研的数据产品,而是我们的大数据集群

    具体体现在多个地方

    1. Yarn所有队列拥堵
      • 虽然我们Yarn已经按照任务类型、任务调度周期等特点划分了多个队列,但是在高峰期,还是会出现所有队列打满,任务积压严重的情况
      • 我们使用的调度系统是Apache Dolphinscheduler,虽然它已经足够优秀,但在某些场景下的功能依旧不够充分
        • Worker的负载均衡:针对的是Worker机器的负载,如果任务都是提交在Yarn或K8s集群上,那Worker的负载其实会比较低,这时候频繁的提交任务可能会导致Yarn、K8s的爆炸
        • Worker分组:粗粒度的分组,多个组可以共享同样的Worker,但是无法做到资源限值,可能出现别人将我的Worker资源全部占满,而我却无能为力的情况;如果我独享我的Worker,却又会出现Worker空闲的情况。总结来说就是:旱的旱死、涝的涝死
        • 不支持Hook:除非二次开发,否则某些地方想做一下特殊的事情比较困难,如果能有像Hive一样的Hook机制将会舒服很多
        • 不支持任务临时执行:可能是因为Task以Json存放在工作流相关的表中而非自己有单独的表,导致Dolphin在当前版本不支持临时执行任务,必须将任务归属到某个工作流中,通过工作流来执行
        • 缺乏Metrics信息:拿不到系统的指标信息,比如当前使用多少线程?多少任务在等待提交……等等信息;
        • 当然不是说Dolphin不好,只是他需要更多的时间去发展,但是我们公司可能等不及他的发展
      • 分析师直连Impala集群,肆意提交任务;同时又因为我们的Impala集群与Yarn机器是混布的状态,只要Impala将Io、Cpu打满,那我们Yarn上的任务必然会跑不动
    2. Hdfs压力大
      • 我们Hive的默认引擎是是Hive on spark,同时我们的开发平台也支持Spark、Flink等各种类型执行引擎,很多同学在进行数据开发的时候,为了让任务跑的更快,很多情况下会将并行度开的很大;也有可能在某些场景中,需要用到多级动态分区;种种的情况,导致我们Hdfs上的小文件巨多;因为历史遗留问题,我们有很多数据都是通过Flume采集到HDFS集群,这部分数据在HDFS上产生了很多小文件
      • 我们数仓侧的H+1的ODS层任务,都是采用小时全量的方式,也就是说,每小时都会拉取一次业务库的全量数据,虽然会和前一个小时的分区进行合并,写入当天的分区,但是说到底,ODS层存储的是每天的全量业务数据,导致磁盘使用率非常高,而且有很多数据其实是冗余存在的

    这些问题,我想是一家公司,从零开始做大数据都会遇到的问题,甚至说拥有成熟、经验丰富的大数据团队的公司,也会遇到这些问题。所以,为了彻底解决以上的问题,我们决定从任务、数据两个维度入手进行治理,力求做到:数据&计算资源充分利用,数据准时产出,集群平稳运行

    本期先从任务治理入手,来讲讲我们是怎么进行任务治理的。

    什么是任务编排

    在我们看来,任务治理需要分为三步

    • 任务编排
    • 任务分析
    • 任务全链路、全生命周期追踪

    涉及到篇幅问题,本期只讲述任务编排。不过正式开始讲之前,再聊一下我们在用上任务编排系统前,存在的一些系统架构层面的问题

    首先,我们能够接受任务提交并真正提交到Yarn/K8s集群上的执行器有3种

    • Dolphin的Worker,负责执行Beeline(Hive)&Shell类型任务
    • 基于Zeppelin封装的应用,用来提交分析型任务,如Impala、Python
    • Fin-processor,我们用来提交和执行数据交换类型任务,同时也负责FLINK_SQL和SPARK_SQL类型任务的提交

    可以看到,这里的设计比较混乱,能够去提交任务的系统有三个,本质上是同一件事情,却被三个应用给干了

    这是其一

    其二,在我们的系统Fin-processor中,每种任务类型有自己单独的线程池,当有任务被提交时,通过线程池的负载来进行任务的提交、等待、拒绝;
    这样的实现方法很简单很粗暴,但是也带来了一些的问题

    • 有些任务很重要,但是因为有太多的不重要的任务先提交了过来,导致重要任务被在队列中等待甚至被拒绝
    • 对于即系查询任务(目前Fin上没有,但如果没有任务编排系统,未来上面也会有该类型任务),我希望很快任务就能被提交;而对于天级别任务,我的任务晚一点被提交也没问题,如何智能的去选择应该优先被提交的任务

    其三,目前我们的开发平台支持配置任务的优先级和任务告警,但是因为这些配置都是人手动去配置的,每个人想怎么填就怎么填,所以导致了所有任务的优先级都是最高;而任务告警则可能告给一个毫无相关的人甚至是公司高层;这种问题我应该如何去解决?因为语言上的约束是没有用的

    带着这些问题,我们再来看什么是任务编排

    何为任务编排?

    并非传统意义上的任务调度、容器编排,这里的任务编排主要是解决Yarn所有队列拥堵&我们系统架构设计的问题,所以我们希望我们的任务编排系统,有以下几种能力

    • 细粒度资源管控:
      • 需要多种负载均衡策略,不只是执行任务的机器的负载均衡,如果任务是提交到资源调度集群如Yarn、K8s上,则需要根据对应的集群情况来判断任务是否能够提交,提交到哪一台机器……等等。
      • 支持Worker的分组,多个组之间可以共享资源,但存在优先级及最大资源限值,优先保障高优先级人群、高优先级任务被执行
      • 高优先级任务支持多种告警方式,支持值班机制;低优先级任务通过IM通知即可
    • Hook机制:
      • 有别的系统需要采集我们的任务提交前、任务结束后的事件,包括一些任务信息;就像Hive hook那样,可以采集到任务血缘等信息
    • 指标暴露
      • 通过指标更好的了解你系统的内部情况,提早发现问题并及时处理

    这里最关键的是细粒度资源管控,因为这个任务编排系统,将会决定一个任务能否被提交,何时被提交等等

    所以,接下来我们来看一下我们任务编排系统的资源组设计

    资源组设计

    我们的任务编排系统的资源组设计来自于Trino(原Presto),有了解过的可以跳过这部分,接下来我会说一下什么是资源组?为什么它可以帮我们做到细粒度的资源管控

    何为资源组

    是通过各种资源管控策略来限制任务的提交,从而降低各个集群的负载

    每个人提交到任务编排系统后,会根据任务本身的属性,来给他分配到对应的资源组中。

    当一个任务被提交到资源组A中时,A会检查它自身使用的资源是否已经超过了给他的限值;如果已经超过了,将会将任务放到队列中,等待资源空闲后将其取出再次提交;或者直接返回失败给提交任务的客户端

    那么,资源组是通过什么指标来判断任务是否被提交呢?

    资源组配置

    名称说明
    name资源组名称
    maxQueued最大等待任务数
    concurrencyLimit最大运行任务数
    workerList拥有的执行器列表
    memoryLimit资源组最大内存使用量
    cpuLimit资源组最大Cpu Load值
    schedulerPolicy调度策略
    schedulerWeight调度权重

    调度策略详解

    FAIR

    配置为FAIR的资源组,队列中的任务按FIFO的策略来提交任务

    WEIGHTED

    配置为WEIGHTED的资源组,队列中的任务被提交到该资源组时,需要携带权重值,权重越高的任务约容易被提交

    PRIORITY

    配置为PRIORITY的资源组,队列中的任务被提交到该资源组时,需要携带优先级值,每次取出队列中优先级最高的任务提交执行

    目前我们的生产环境中,一共配置了三个资源组

    • tmp:临时执行的任务将被提交到该资源组中,策略为FAIR
    • etl:所有的调度任务会被提交到该资源组中,策略为WEIGHTED;权重值按任务类型、任务归属BU等属性生成
    • kpi:所有关键任务会被提交到该资源组中,策略为PRIORITY;通常用来保障数仓团队关键任务、老板要看的报表等;任务如果需要被分配到该组中,需要有对应的申请&审批,且如果一个任务被升级至kpi任务,那么他的所有父节点任务都会被升级为kpi任务

    此外,不止资源组内部的任务有调度策略,不同的资源组之间也有调度策略;因为所有的资源组都从属于一个虚拟的root资源组,当定时任务试图取出任务提交时,会根据调度策略,来决定取出哪个资源组中的任务;因为我们想尽可能多的选择kpi资源组中的任务进行执行,所以我们对root资源组配置的策略是WEIGHTED,这样我们能保证大部分情况下,先取出kpi中的任务,而且tmpetl中的任务也不至于太过饥饿

    通过资源组,我们可以很轻易的做到任务提交管控,从而可以很好的缓解Yarn&K8s集群的压力,并且我们将所有任务的提交都收口在了我们这个系统中,我们也解决了我们架构上的一些问题,可以说,资源组是整个任务编排系统中,最核心最关键的一环

    其余设计

    架构设计

    系统一共有四个角色,也就是说会有四个Jvm进程被启动

    • ApiServer
      • 接受前端请求:任务大盘、任务执行历史查询、资源组管理、告警管理、服务管理等请求
      • 接受二方应用请求:任务的execute、kill、queryLog等
      • 请求Coordinator:负责转发任务、资源组、服务管理操作相关;因为Coordinator是高可用设计,二方应用无法得知谁是Active状态的Coordinator
      • 请求Alarm:负责转发告警相关的请求
    • Coordinator
      • 接受ApiServer请求;对资源组中的任务进行CRUD
      • 接受Worker请求:主要是Worker对任务执行结果的Ack通知,任务结束后需要从Running队列中摘除该任务
      • 自身:主从切换、资源组管理、集群资源获取等;负责管理任务在提交前的所有状态
      • 请求AlarmServer:告知任务的最终状态,交由AlarmServer判断 是否需要告警、告警方式等
    • Worker
      • 接受Coordinator请求:执行或杀死任务
      • 自身:提供获取当前Worker资源使用情况;负责管理任务在提交后的所有状态
      • 请求Coordinator:通过状态机的轮转,每触发一次状态的变化都会通知Coordinator
    • Alarm
      • 接受ApiServer请求:告警管理;
      • 接受Coordinator请求:根据任务状态,任务告警配置,生成对应的告警策略
      • 自身:通过告警策略,将任务的结果通知给对应的责任人/责任群;

    整体设计的比较灵活,如果觉得角色过多,可以考虑将Alarm和Coordinator合并

    高可用&负载均衡设计

    • ApiServer:所有请求先通过ELB(华为云的弹性负载均衡),再转发到ApiServer,该角色可以部署任意台
    • Coordinator:通过Zookeeper的瞬时节点+监听机制可以实现主备模式;当Coordinator是备节点时,将会拒绝所有请求;当发生主从切换时,即将切换到Active状态的Coordinator,需要将处于等待提交和运行中的任务,从数据库中恢复到内存中;如果所有数据不放内存只放数据库,每次需要先扫表中所有待提交任务,再重新排序之后拿出一个任务进行提交,提交完后需要重新扫表(防止有更高优先级任务),接下来将重新排序;有多少个任务需要被提交就得重复多少次这个逻辑,时间复杂度O(n),且影响提交性能
    • Worker:上下线时,需要Kill所有由本机提交的任务,并标记为失败;
    • Alarm:无状态服务,随意重启;且不在主流程中,最多无法触发告警

    所有服务均会将自己的Metriccs暴露,通过Prometheus拉取,在Grafana中展示;并配置相应告警,如果服务宕机或不健康,则会通知对应的责任人(我);

    Metrics包括但不限于:Jvm状态、机器负载、磁盘状态;Coordinator服务将会上报运行中的任务数、队列中的任务数等等

    任务类型

    • JDBC:支持Hive/Impala/Clickhouse/Trino类型的Jdbc任务,理论上支持任意类型的Jdbc类型任务
    • FLINK_SQL:支持离线&实时两种类型的FLINK_SQL任务,并支持Application/Perjob等多种模式;
    • SPARK_SQL:通过PySpark封装,支持yarn-cluster模式
    • 数据交换&采集:对Sqoop和Datax任务的封装
    • Python:任务运行在Docker容器中,并限制使用的核数和内存数;支持任意版本的Python,支持所有Pip依赖的安装
    • Shell:任务运行在Docker容器中,并限制使用的核数和内存数;支持yum install安装任意依赖包

    告警设计

    • 告警策略
      • 任务执行成功或任务执行失败但是是临时执行
        • 通知对应的责任人即可,且只通知一次
      • 任务非成功状态且是调度执行
        • 每15分钟触发一次告警,除非手动停止告警,否则将持续告警;停止的告警将无法恢复
        • 如果任务对应的资源组是kpi则将会打电话;否则只会发送短信

    考虑到任务可能每晚都有任务会失败,有些人可能连续好几个晚上都会被叫起来处理问题,所以需要有值班体系;每晚都会有对应的值班人,任务失败优先打给值班人员,如果告警持续N次后,依旧无人关闭告警,说明值班人员未处理问题,将会打电话给兜底人,由兜底人进行处理

    • 告警值班
      • 生成值班列表,每天下班前会通知对应的值班人员今天需要值班
      • 兜底人配置,当值班人员未处理任务时,由兜底人处理问题
      • 调度任务的非成功状态,将会直接发送给值班人员而非任务的owner

    写在最后

    • 写到这里,基本上我们任务编排系统的诞生初衷,以及相关设计,包括能解决什么样的问题,我想大家都已经明白了;那么,任务治理"三板斧"之任务编排 讲到这里就结束了,下一次我们会讲一下任务治理的下一把斧头——任务治理

    • 另外,我是打算将这个任务编排系统开源的,不过肯定不能直接将公司的代码给发出来,所以我准备从零再写一份开源版本,将不会依赖公司的任何组件,预计今年结束前能够在我的Github上发出来

    • 记得点赞,不要白嫖

    更多相关内容
  • 任务编排 由Yaml配置文件驱动的简单任务编排框架 安装 将此行添加到您的应用程序的Gemfile中: gem 'task-orchestrator' 然后执行: $ bundle 或将其自己安装为: $ gem install task-orchestrator 用法 $ ...
  • 任务编排工具和工作流程最近,出现了用于编排任务和数据工作流的新工具(有时称为" MLOps")。 这些工具的数量众多,因此很难选择要使用的工具,也难以理解它们的重叠方式,因此我们决定对一些最受欢迎的工具进行比较...

    05e3e8214eb47b961448e0f33e056a83.png

    任务编排工具和工作流程

    最近,出现了用于编排任务和数据工作流的新工具(有时称为" MLOps")。 这些工具的数量众多,因此很难选择要使用的工具,也难以理解它们的重叠方式,因此我们决定对一些最受欢迎的工具进行比较。

    ad59bb82a143e8085337c729fc8a327a.png

    > Airflow is the most popular solution, followed by Luigi. There are newer contenders too, and they'

    总体而言,Apache Airflow既是最受欢迎的工具,也是功能最广泛的工具,但是Luigi是类似的工具,上手起来比较简单。 Argo是团队已经在使用Kubernetes时经常使用的一种,而Kubeflow和MLFlow满足了与部署机器学习模型和跟踪实验有关的更多利基要求。

    在进行详细比较之前,了解一些与任务编排相关的更广泛的概念很有用。

    什么是任务编排,为什么有用?

    较小的团队通常从手动管理任务开始,例如清理数据,训练机器学习模型,跟踪结果以及将模型部署到生产服务器。 随着团队规模和解决方案的增长,重复步骤的数量也随之增加。 可靠地执行这些任务也变得更加重要。

    这些任务相互依赖的复杂方式也在增加。 刚开始时,您可能需要每周或每月一次运行一系列任务。 这些任务需要按特定顺序运行。 随着您的成长,该管道变成具有动态分支的网络。 在某些情况下,某些任务会引发其他任务,而这些可能取决于首先运行的其他几个任务。

    可以将该网络建模为DAG(有向无环图),该模型对每个任务及其之间的依赖关系进行建模。

    98128b320eb86ca28df3c0e0f2bd4c67.png

    > A pipeline is a limited DAG where each task has one upstream and one downstream dependency at most

    工作流程编排工具允许您通过指定所有任务以及它们如何相互依赖来定义DAG。 然后,该工具按正确的顺序按计划执行这些任务,然后在运行下一个任务之前重试任何失败的任务。 它还会监视进度并在发生故障时通知您的团队。

    CI / CD工具(例如Jenkins)通常用于自动测试和部署代码,这些工具与任务编排工具之间有很强的相似性-但也有重要的区别。 尽管从理论上讲,您可以使用这些CI / CD工具来编排动态的,相互链接的任务,但在一定程度的复杂性下,您会发现改用Apache Airflow等更通用的工具会更容易。

    总体而言,任何业务流程工具的重点都是确保集中,可重复,可重现和高效的工作流程:虚拟命令中心,用于您的所有自动化任务。 考虑到这种情况,让我们看看一些最流行的工作流程工具是如何叠加的。

    告诉我使用哪一个

    您可能应该使用:

    Apache Airflow如果您需要功能最全,最成熟的工具,则可以花时间来学习它的工作原理,设置和维护。

    Luigi,如果您需要比Airflow更轻松的学习方法。 它具有较少的功能,但更容易上手。

    Argo,如果您已经对Kubernetes生态系统进行了深入投资,并希望将所有任务作为Pod进行管理,并在YAML(而不是Python)中定义它们。

    如果您想使用Kubernetes,但仍使用Python而不是YAML定义任务,则使用KubeFlow。

    MLFlow,如果您更关心使用MLFlow的预定义模式来跟踪实验或跟踪和部署模型,而不是寻找能够适应现有自定义工作流程的工具。

    比较表

    51cdda584f525510a64d28baf70740b5.png

    > (Source: Author) – For more Machine Learning Tips — Get our weekly newsletter

    为了快速浏览,我们比较了以下方面的库:

    成熟度:基于项目的年龄以及修复和提交的次数;

    受欢迎程度:基于采用率和GitHub星级;

    简洁性:基于易于注册和采用;

    广度:基于每个项目的专业性与适应性;

    语言:基于您与工具互动的主要方式。

    这些不是严格的基准或科学基准,但目的是让您快速了解这些工具的重叠方式以及它们之间的区别。 有关更多详细信息,请参见下面的正面对比。

    Luigi 对比 Airflow

    Luigi和Airflow解决了类似的问题,但是Luigi要简单得多。 它包含在一个组件中,而Airflow有多个模块,可以用不同的方式进行配置。 气流具有更大的社区和一些其他功能,但学习曲线却陡峭得多。 具体来说,Airflow在计划方面要强大得多,它提供了日历UI,可帮助您设置任务应在何时运行。 使用Luigi,您需要编写更多的自定义代码以按计划运行任务。

    两种工具都使用Python和DAG定义任务和依赖项。 如果您的团队较小并且需要快速上手,请使用Luigi。 如果您的团队规模较大,可以使用Airflow,一旦您掌握了学习曲线,就可以从最初的生产力损失中获得更大的动力。

    Luigi 对比 Argo

    Argo建立在Kubernetes之上,并且每个任务都作为单独的Kubernetes容器运行。 如果您已经在大多数基础架构中使用Kubernetes,这可能会很方便,但如果您没有使用它,则会增加复杂性。 Luigi是一个Python库,可以与Python包管理工具(如pip和conda)一起安装。 Argo是Kubernetes扩展,使用Kubernetes安装。 虽然这两种工具都可以将任务定义为DAG,但使用Luigi时,您将使用Python编写这些定义,而使用Argo时,您将使用YAML。

    如果您已经对Kubernetes进行了投资,并且知道所有任务都是吊舱,请使用Argo。 如果要编写DAG定义的开发人员对YAML比对Python更满意,则还应该考虑一下。 如果您不是在Kubernetes上运行并且在团队中拥有Python专业知识,请使用Luigi。

    Luigi 对比 Kubeflow

    Luigi是用于常规任务编排的基于Python的库,而Kubeflow是专门用于机器学习工作流的基于Kubernetes的工具。 Luigi是为协调一般任务而构建的,而Kubeflow具有用于实验跟踪,超参数优化和为Jupyter笔记本服务的预构建模式。 Kubeflow由两个不同的组件组成:Kubeflow和Kubeflow管道。 后者专注于模型部署和CI / CD,并且可以独立于主要Kubeflow功能使用。

    如果您需要协调从数据清理到模型部署的各种不同任务,请使用Luigi。 如果您已经使用Kubernetes并希望安排常见的机器学习任务(例如实验跟踪和模型训练),请使用Kubeflow。

    Luigi 对比 MLFlow

    Luigi是一个通用的任务编排系统,而MLFlow是一个更专业的工具,可以帮助管理和跟踪您的机器学习生命周期和实验。 您可以使用Luigi定义常规任务和依赖项(例如训练和部署模型),但是可以将MLFlow直接导入到机器学习代码中,并使用其助手功能来记录信息(例如您正在使用的参数)并 工件(例如训练有素的模型)。 您还可以将MLFlow用作命令行工具,以服务使用通用工具(例如scikit-learn)构建的模型或将其部署到通用平台(例如AzureML或Amazon SageMaker)。

    Airflow 对比 Argo

    Argo和Airflow都允许您将任务定义为DAG,但是在Airflow中,您可以使用Python进行此操作,而在Argo中,您可以使用YAML。 Argo作为Kubernetes窗格运行每个任务,而Airflow则生活在Python生态系统中。 在选择Argo之前,Canva评估了这两个选项,您可以观看此演讲以获取详细的比较和评估。

    如果您想要更成熟的工具并且不关心Kubernetes,请使用Airflow。 如果您已经对Kubernetes进行了投资,并且想要运行以不同堆栈编写的各种任务,请使用Argo。

    Airflow 对比 Kubeflow

    Airflow是一个通用的任务编排平台,而Kubeflow则特别专注于机器学习任务,例如实验跟踪。 两种工具都允许您使用Python定义任务,但是Kubeflow在Kubernetes上运行任务。 Kubeflow分为Kubeflow和Kubeflow管道:后一个组件允许您指定DAG,但它比常规任务更着重于部署和模型服务。

    如果您需要一个成熟的,广泛的生态系统来执行各种不同的任务,请使用Airflow。 如果您已经使用Kubernetes,并希望使用更多现成的机器学习解决方案模式,请使用Kubeflow。

    Airflow 对比 MLFlow

    Airflow是一个通用的任务编排平台,而MLFlow是专门为优化机器学习生命周期而构建的。 这意味着MLFlow具有运行和跟踪实验以及训练和部署机器学习模型的功能,而Airflow具有广泛的用例,您可以使用它来运行任何任务集。 Airflow是一组用于管理和计划任务的组件和插件。 MLFlow是一个Python库,您可以将其导入到现有的机器学习代码中,还可以使用命令行工具来将scikit-learn编写的机器学习模型训练和部署到Amazon SageMaker或AzureML。

    如果您想以一种开明的,开箱即用的方式来管理机器学习实验和部署,请使用MLFlow。 如果您有更复杂的要求并且想要更好地控制如何管理机器学习生命周期,请使用Airflow。

    Argo 对比 Kubeflow

    Kubeflow的某些部分(例如Kubeflow管道)建立在Argo之上,但是Argo的建立是为了编排任何任务,而Kubeflow则专注于特定于机器学习的任务,例如实验跟踪,超参数调整和模型部署。 Kubeflow管道是Kubeflow的一个独立组件,专注于模型部署和CI / CD,并且可以独立于Kubeflow的其他功能使用。 这两种工具都依赖Kubernetes,如果您已经采用了它,那么可能会让您更感兴趣。 使用Argo,您可以使用YAML定义任务,而Kubeflow允许您使用Python接口。

    如果您需要管理作为Kubernetes Pod运行的常规任务的DAG,请使用Argo。 如果您想要更专注于机器学习解决方案的工具,请使用Kubeflow。

    Argo 对比 MLFlow

    Argo是一个任务编排工具,可让您将任务定义为Kubernetes Pod,并将其作为DAG运行(使用YAML定义)。 MLFlow是一种更加专业的工具,它不允许您定义任意任务或它们之间的依赖关系。 相反,您可以将MLFlow作为Python库导入到现有的(Python)机器学习代码库中,并使用其助手功能记录工件和参数,以帮助进行分析和实验跟踪。 您还可以使用MLFlow的命令行工具来训练scikit学习模型,并将其部署到Amazon Sagemaker或Azure ML,以及管理Jupyter笔记本。

    如果您需要管理常规任务并想在Kubernetes上运行它们,请使用Argo。 如果您希望采用一种自以为是的方法来使用托管云平台管理机器学习生命周期,请使用MLFlow。

    Kubeflow 对比 MLFlow

    与诸如Airflow或Luigi之类的通用任务编排平台相比,Kubeflow和MLFlow都是更小的,更专业的工具。 Kubeflow依赖Kubernetes,而MLFlow是一个Python库,可帮助您将实验跟踪添加到现有的机器学习代码中。 Kubeflow允许您构建完整的DAG,其中每个步骤都是Kubernetes窗格,但是MLFlow具有内置功能,可以将scikit学习模型部署到Amazon Sagemaker或Azure ML。

    【责任编辑:赵宁宁 TEL:(010)68476606】

    点赞 0

    展开全文
  • 最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。任务编排工作流任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相...

    最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。

    任务编排工作流

    任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后就能形成一个 workflow 工作流了。我们希望这个工作流按照我们编排的方式去执行每个原子 task 任务。如下图所示,我们希望先并发运行 Task A 和 Task C,Task A 执行完后串行运行 Task B,在并发等待 Task B 和 C 都结束后运行 Task D,这样就完成了一个典型的任务编排工作流。

    431fb6f423281b009d732f5dafb7634a.png

    DAG 有向无环图

    首先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成一个环的成为有环图,反之称为无环图。显然运用在我们任务编排工作流上,最合适的是 DAG 有向无环图。

    我们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。

    下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。

    83e60a395f7cdcecf3d4e421c9a61089.png

    此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。

    17ddb7d9d413fe86fa0728046a6aecdd.png

    一般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更方便点。

    一个任务编排框架

    了解了 DAG 的基本知识后我们可以来简单实现一下。首先是存储结构,我们的 Dag 表示一整个图,Node 表示各个顶点,每个顶点有其 parents 和 children://Dagpublic final class DefaultDag implements Dag {private Map> nodes = new HashMap>();

    ...

    }//Nodepublic final class Node {/**

    * incoming dependencies for this node

    */private Set> parents = new LinkedHashSet>();/**

    * outgoing dependencies for this node

    */private Set> children = new LinkedHashSet>();

    ...

    }复制代码

    画两个顶点,以及为这两个顶点连边操作如下:public void addDependency(final T evalFirstNode, final T evalLaterNode) {

    Node firstNode = createNode(evalFirstNode);

    Node afterNode = createNode(evalLaterNode);

    addEdges(firstNode, afterNode);

    }   private Node createNode(final T value) {

    Node node = new Node(value);return node;

    }private void addEdges(final Node firstNode, final Node afterNode) {if (!firstNode.equals(afterNode)) {

    firstNode.getChildren().add(afterNode);

    afterNode.getParents().add(firstNode);

    }

    }复制代码

    到现在我们其实已经把基础数据结构写好了,但我们作为一个任务编排框架最终是需要线程去执行的,我们把它和线程池一起给包装一下。//任务编排线程池public class DefaultDexecutor  {//执行线程,和2种重试线程private final ExecutorService executionEngine;private final ExecutorService immediatelyRetryExecutor;private final ScheduledExecutorService scheduledRetryExecutor;//执行状态private final ExecutorState state;

    ...

    }//执行状态public class DefaultExecutorState {//底层图数据结构private final Dag graph;//已完成private final Collection> processedNodes;//未完成private final Collection> unProcessedNodes;//错误taskprivate final Collection> erroredTasks;//执行结果private final Collection> executionResults;

    }复制代码

    可以看到我们的线程包括执行线程池,2 种重试线程池。我们使用 ExecutorState 来保存一些整个任务工作流执行过程中的一些状态记录,包括已完成和未完成的 task,每个 task 执行的结果等。同时它也依赖我们底层的图数据结构 DAG。

    接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执行就可以了,过程中注意一些节点状态的保持,结果的保存即可。

    4c839d7796fbc0e94febed8cb5fa0c36.png

    还是以上图为例,值得说的一点是在 Task D 这个点需要有一个并发等待的操作,即 Task D 需要依赖 Task B 和 Task C 执行结束后再往下执行。这里有很多办法,我选择了共享变量的方式来完成并发等待。遍历工作流中被递归的方法的伪代码如下:private void doProcessNodes(final Set> nodes) {for (Node node : nodes) {//共享变量 并发等待if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {

    Task task = newTask(node);this.executionEngine.submit(task);

    ...

    ExecutionResult executionResult = this.executionEngine.proce***esult();if (executionResult.isSuccess()) {

    state.markProcessingDone(processedNode);

    }//继续执行孩子节点doExecute(processedNode.getChildren());

    ...

    }

    }

    }复制代码

    这样我们基本完成了这个任务编排框架的工作,现在我们可以如下来进行示例图中的任务编排以及执行:DefaultExecutor executor = newTaskExecutor();

    executor.addDependency("A", "B");

    executor.addDependency("B", "D");

    executor.addDependency("C", "D");

    executor.execute();复制代码

    任务编排平台化

    好了现在我们已经有一款任务编排框架了,但很多时候我们想要可视化、平台化,让使用者更加无脑。

    框架与平台最大的区别在哪里?是可拖拽的可视化输入么?我觉得这个的复杂度更多在前端。而对于后端平台来讲,与框架最大的区别是数据的持久化。

    对于 DAG 的顶点来说,我们需要将每个节点 Task 的信息给持久化到关系数据库中,包括 Task 的状态、输出结果等。而对于 DAG 的边来说,我们也得用数据库来存储各 Task 之间的方向关系。此外,在遍历执行 DAG 的整个过程中的中间状态数据,我们也得搬运到数据库中。

    首先我们可以设计一个 workflow 表,来表示一个工作流。接着我们设计一个 task 表,来表示一个执行单元。task 表主要字段如下,这里主要是 task_parents 的设计,它是一个 string,存储 parents 的 taskId,多个由分隔符分隔。task_id

    workflow_id

    task_name

    task_status

    result

    task_parents复制代码

    3326817e5f01f9a928b4511039f52c47.png依赖是上图这个例子,对比框架来说,我们首先得将其存储到数据库中去,最终可能得到如下数据:task_id  workflow_id  task_name  task_status  result  task_parents

    1          1           A           0                    -1

    2          1           B           0                    1

    3          1           C           0                    -1

    4          1           D           0                    2,3复制代码

    可以看到,这样也能很好地存储 DAG 数据,和框架中代码的输入方式差别并不是很大。

    接下来我们要做的是遍历执行整个 workflow,这边和框架的差别也不大。首先我们可以利用select * from task where workflow_id = 1 and task_parents = -1来获取初始化节点 Task A 和 Task C,将其提交到我们的线程池中。

    接着对应框架代码中的doExecute(processedNode.getChildren());,我们使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子节点 Task D,这里使用了模糊查询是因为我们的 task_parents 可能是由多个父亲的 taskId 与分隔号组合而成的字符串。查询到孩子节点后,继续提交到线程池即可。

    别忘了我们在 Task D 这边还有一个并发等待的操作,对应框架代码中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。这边我们只要判断select count(1) from task where task_id in (2,3) and status != 1的个数为 0 即可,即保证 parents task 全部成功。

    另外值得注意的是 task 的重试。在框架中,失败 task 的重试可以是立即使用当前线程重试或者放到一个定时线程池中去重试。而在平台上,我们的重试基本上来自于用户在界面上的点击,即主线程。

    至此,我们已经将任务编排框架的功能基本平台化了。作为一个任务编排平台,可拖拽编排的可视化输入、整个工作流状态的可视化展示、任务的可人工重试都是其优点。

    展开全文
  • 基于DAG的任务编排框架/平台

    千次阅读 2021-08-02 22:50:27
    一、任务编排工作流 任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后就能形成一个 workflow 工作流了。我们希望这个工作流按照我们...

    一、任务编排工作流

    任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后就能形成一个 workflow 工作流了。我们希望这个工作流按照我们编排的方式去执行每个原子 task 任务。如下图所示,我们希望先并发运行 Task A 和 Task C,Task A 执行完后串行运行 Task B,在并发等待 Task B 和 C 都结束后运行 Task D,这样就完成了一个典型的任务编排工作流。

    二、DAG 有向无环图

    首先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成一个环的成为有环图,反之称为无环图。显然运用在我们任务编排工作流上,最合适的是 DAG 有向无环图。

    我们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。

    下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。

    此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。

    一般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更方便点。

    三、一个任务编排框架

    了解了 DAG 的基本知识后我们可以来简单实现一下。

    了解JUC包的可能快速想到CompletableFuture,这个类对于多个并发线程有复杂关系耦合的场景是很适用的,如果是一次性任务,那么使用CompletableFuture完全没有问题。但是作为框架或者平台来说,我们还需要考虑存储节点状态、重试执行等逻辑,对于这些CompletableFuture是不能满足的。

    我们需要更完整地考虑与设计这个框架。首先是存储结构,我们的 Dag 表示一整个图,Node 表示各个顶点,每个顶点有其 parents 和 children:

    //Dag
    public final class DefaultDag<T, R> implements Dag<T, R> {
    
    	private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
        ...
    }
    
    //Node
    public final class Node<T, R> {
    	/**
    	 * incoming dependencies for this node
    	 */
        private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
        /**
         * outgoing dependencies for this node
         */
        private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
        ...
    }
    

    画两个顶点,以及为这两个顶点连边操作如下:

    public void addDependency(final T evalFirstNode, final T evalLaterNode) {
    	Node<T, R> firstNode = createNode(evalFirstNode);
    	Node<T, R> afterNode = createNode(evalLaterNode);
    
    	addEdges(firstNode, afterNode);
    }
    
       
    private Node<T, R> createNode(final T value) {
    	Node<T, R> node = new Node<T, R>(value);
    	return node;
    }
    private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
    	if (!firstNode.equals(afterNode)) {
    		firstNode.getChildren().add(afterNode);
    		afterNode.getParents().add(firstNode);
    	}
    }
    

    到现在我们其实已经把基础数据结构写好了,但我们作为一个任务编排框架最终是需要线程去执行的,我们把它和线程池一起给包装一下。

    //任务编排线程池
    public class DefaultDexecutor <T, R> {
    
        //执行线程,和2种重试线程
    	private final ExecutorService<T, R> executionEngine;
    	private final ExecutorService immediatelyRetryExecutor;
    	private final ScheduledExecutorService scheduledRetryExecutor;
        //执行状态
    	private final ExecutorState<T, R> state;
        ...
    }
    //执行状态
    public class DefaultExecutorState<T, R> {
        //底层图数据结构
    	private final Dag<T, R> graph;
        //已完成
    	private final Collection<Node<T, R>> processedNodes;
        //未完成
    	private final Collection<Node<T, R>> unProcessedNodes;
        //错误task
    	private final Collection<ExecutionResult<T, R>> erroredTasks;
        //执行结果
    	private final Collection<ExecutionResult<T, R>> executionResults;
    }
    

    可以看到我们的线程包括执行线程池,2 种重试线程池。我们使用 ExecutorState 来保存一些整个任务工作流执行过程中的一些状态记录,包括已完成和未完成的 task,每个 task 执行的结果等。同时它也依赖我们底层的图数据结构 DAG。

    接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执行就可以了,过程中注意一些节点状态的保持,结果的保存即可。

    还是以上图为例,值得说的一点是在 Task D 这个点需要有一个并发等待的操作,即 Task D 需要依赖 Task B 和 Task C 执行结束后再往下执行。这里有很多办法,我选择了共享变量的方式来完成并发等待。遍历工作流中被递归的方法的伪代码如下:

    private void doProcessNodes(final Set<Node<T, R>> nodes) {
    		for (Node<T, R> node : nodes) {
            //共享变量 并发等待
            if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
                Task<T, R> task = newTask(node);
                this.executionEngine.submit(task);
                ...
                ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
                if (executionResult.isSuccess()) {
    	    	state.markProcessingDone(processedNode);
    	    }
                //继续执行孩子节点
    	    doExecute(processedNode.getChildren());
                ...
            }
        }
    }
    

    这样我们基本完成了这个任务编排框架的工作,现在我们可以如下来进行示例图中的任务编排以及执行:

    DefaultExecutor<String, String> executor = newTaskExecutor();
    executor.addDependency("A", "B");
    executor.addDependency("B", "D");
    executor.addDependency("C", "D");
    executor.execute();
    

    四、任务编排平台化

    好了现在我们已经有一款任务编排框架了,但很多时候我们想要可视化、平台化,让使用者更加无脑。

    框架与平台最大的区别在哪里?是可拖拽的可视化输入么?我觉得这个的复杂度更多在前端。而对于后端平台来讲,与框架最大的区别是数据的持久化。

    对于 DAG 的顶点来说,我们需要将每个节点 Task 的信息给持久化到关系数据库中,包括 Task 的状态、输出结果等。而对于 DAG 的边来说,我们也得用数据库来存储各 Task 之间的方向关系。此外,在遍历执行 DAG 的整个过程中的中间状态数据,我们也得搬运到数据库中。

    首先我们可以设计一个 workflow 表,来表示一个工作流。接着我们设计一个 task 表,来表示一个执行单元。task 表主要字段如下,这里主要是 task_parents 的设计,它是一个 string,存储 parents 的 taskId,多个由分隔符分隔。

    task_id
    workflow_id
    task_name
    task_status
    result
    task_parents
    


    依赖是上图这个例子,对比框架来说,我们首先得将其存储到数据库中去,最终可能得到如下数据:

    task_id  workflow_id  task_name  task_status  result  task_parents
      1          1           A           0                    -1
      2          1           B           0                    1
      3          1           C           0                    -1
      4          1           D           0                    2,3
    

    可以看到,这样也能很好地存储 DAG 数据,和框架中代码的输入方式差别并不是很大。

    接下来我们要做的是遍历执行整个 workflow,这边和框架的差别也不大。首先我们可以利用select * from task where workflow_id = 1 and task_parents = -1来获取初始化节点 Task A 和 Task C,将其提交到我们的线程池中。

    接着对应框架代码中的doExecute(processedNode.getChildren());,我们使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子节点 Task D,这里使用了模糊查询是因为我们的 task_parents 可能是由多个父亲的 taskId 与分隔号组合而成的字符串。查询到孩子节点后,继续提交到线程池即可。

    别忘了我们在 Task D 这边还有一个并发等待的操作,对应框架代码中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。这边我们只要判断select count(1) from task where task_id in (2,3) and status != 1的个数为 0 即可,即保证 parents task 全部成功。

    另外值得注意的是 task 的重试。在框架中,失败 task 的重试可以是立即使用当前线程重试或者放到一个定时线程池中去重试。而在平台上,我们的重试基本上来自于用户在界面上的点击,即主线程。

    至此,我们已经将任务编排框架的功能基本平台化了。作为一个任务编排平台,可拖拽编排的可视化输入、整个工作流状态的可视化展示、任务的可人工重试都是其优点。

    五、基于有向无环图(DAG)的任务调度Demo

    文章: https://blog.csdn.net/dbqb007/article/details/89042984

    展开全文
  • 最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。任务编排工作流任务编排是什么意思呢,顾名思义就是可以把 "任务" 这个原子单位按照...
  • 项目亮点 在真正的生产应用中,进行数据集成的时候,以可视化任务编排或 SQL 开发为数据集成的主要形式,我们认为 Drag and Drop 可视化任务编排可以最大程度减轻用户做数据集成的负担; 另外就是实现对作业进行多...
  • CompleteFuture实现简单的任务编排实践 一:前言 ​ CompleteFuture是java8 新提供的API,是对函数式编程思想的体现,提供了很多的对于函数式编程支持。不止有同步处理功能,还有异步处理能力。 通过函数式编程可以...
  • 这时候我们就可以使用CompletableFuture来进行异步任务编排。举个例子假如现在有一个业务分为如下几步1.业务一耗时0.5秒2.业务二耗时0.5秒3.业务三耗时1秒4.业务四耗时0.5秒(需要用到业务一的结果)5.业务五耗时1秒...
  • 文章目录并发任务编排实现不带返回值/参数传递任务串行执行并行执行并行执行-自定义线程池串并行任务编排 并发任务编排实现 其实Java8中提供了并发编程框架CompletableFuture,以下结合不同场景进行使用。 不带...
  • 使用jdk8中的java.util.concurrent.CompletableFuture非常方便进行异步任务编排 1.supplyAsync 开启异步任务 /** * 1.小白点菜 * 2.厨师做菜 * 3.小白吃饭 */ public static void test1() { SmallTools....
  • 原本在任务计划中执行的一个任务,迁移到了云数据库的任务编排中。任务的主要目的是对当日无效的业务数据,状态更新为已过期。客户反映,最近一段时间,一直有应该过期没有过期的数据,影响了新业务的录入。 2.解决...
  • 1.任务编排介绍 数据库是企业IT系统里的重要基础设施,里面存储了大量有价值的数据资产,如:交易数据、客户数据、订单数据,等等。其实,数据库在企业里一直扮演着一个数据生产者(Producer)的角色,日积月累这些...
  • 分布式任务编排调度框架设计

    千次阅读 2018-09-18 16:53:18
    原本一个人可以轻松维护十几台甚至几十台服务器:写几个常用的监控和配置下发脚本、或者利用cronTab制作几个定时任务就可以搞定。当服务器的数量由几十上升到几百,几千时,量变就引起了质变;而且随着应用数量的...
  • 1.任务编排介绍 数据库是企业IT系统里的重要基础设施,里面存储了大量有价值的数据资产,如:交易数据、客户数据、订单数据,等等。其实,数据库在企业里一直扮演着一个数据生产者(Producer)的角色,日积月累这些...
  • I.内容提要 定时调度系统(定时任务、定时执行)算是工作中经常依赖的中间件系统,简单使用操作系统的 crontab,或基于 Quartz,xxl-job 来...今天我们探讨另一话题,对调度任务的依赖关系及编排展开分析,实现一套.
  • 任务编排工具和工作流程 最近,用于编排任务和数据工作流的新工具激增(有时称为“MLOps”)。这些工具的数量众多,使得选择正确的工具成为一个难题,因此我们决定将一些最受欢迎的工具进行对比。 总体而言,Apache...
  • 刚接触阿里云数据管理DMS中的任务编排,新增任务流程后,往往找不到调度属性,即如何设置任务定期执行。偶尔出险调度属性,却不知道如何主动去设置调度周期。 于是,对界面功能摸索尝试,记录下来。 2.调度属性...
  • Gobrs-Async 动态配置化任务编排框架 作者介绍 dromara 开源组织成员, dromara/Gobrs-Async 作者 目前在某头部电商平台负责研发高并发电商平台核心架构。 带领团队攻克多个技术难题,落地高并发编排框架和各类...
  • AdminSet快速入门 Adminset是一个真正的基于运维思维而开发的全自动化运维平台。 v0.50新功能 全新用户权限系统 基于用户角色的部署权限关联 基于用户权限的功能按钮自动显示隐藏 基于用户的WEBSSH授权 ...
  • 本文将介绍流利说当前工作流中的任务是如何编排的以及治理在整个流程中发挥的价值。工作流系统我们所熟知的 Apache Oozie,Airflow 以及 Azkaban 都是优秀的工作流调度系统,简单的配置或者少量的代码就可以创建 DAG...
  • 文章大纲 拓扑排序(Topological Sorting) java 实现:节点与任务图编排执行 节点 任务图 任务执行 spark on k8s 实现任务编排 k8s 任务编排图解 spark on k8s 任务编排图解 参考文献 拓扑排序(Topological ...
  • etcd-nodes=http://10.1.42.1:4001或已分配给您的docker0接口的任何IP文献资料Xchronos 它是一个分布式且容错的调度程序,它在许多作业存储(etcd / consul)的顶部运行,可用于任务编排。特征支持ISO8061 统计资料...
  • 行业分类-物理装置-一种任务编排方法、装置、设备及存储介质
  • * 异步编排测试,多任务编排时使用 * @author 86156 */ public class AsynTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1、异步执行,没有返回结果 ...
  • 任务编排的应用场景 任务类型 任务编排并不局限于HSF任务,由于框架仅要求传入的是一个函数,通过函数进行抽象,可以支持任意类型的任务编排,例如:HSF、MetaQ、Tair、DB等。 任务编排形式 下单页的例子是串行&...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 40,710
精华内容 16,284
关键字:

任务编排