精华内容
下载资源
问答
  • Airflow

    2020-09-04 18:50:24
    airflow 是一个使用python语言编写的data pipeline管理、调度和监控工作流的平台 DAG: 多个task的集合,定义了这些task间的执行顺序和依赖关系。 定义在python文件中,airflow会执行每一个文件,动态构建出DAG...

    airflow 是一个使用python语言编写的data pipeline管理、调度和监控工作流的平台

    DAG:

    • 多个task的集合,定义了这些task间的执行顺序和依赖关系。
    • 定义在python文件中,airflow会执行每一个文件,动态构建出DAG对象,每一个DAG对应一个workflow。在airflow动态构建过程中,发现DAG满足触发条件,则生产当前execution date的DAG运行实例(DAG RUN)

    task和opeartor:

    • task是operator的一个实例,也是DAGs中的一个node
      • operator描述了task的类型,不同operator可以用来完成不同类型的task;一个operator定义一个task,每个operator独立执行,如:
        • HiveOperator:执行Hive SQL,通常是执行ETL,将数据运算提取后写入目标
        • HivePartitionSensor:检测Hive表中的某个分区是否存在
        • PythonOperator:执行Python脚本

    展开全文
  • airflow

    2018-02-07 16:05:00
    官网: http://airflow.incubator.apache.org/index.html airflow源码:https://github.com/apache/incubator-airflow 参考资料:http://www.open-open.com/lib/view/open1452002876105.html 简介:...

    官网: http://airflow.incubator.apache.org/index.html

    airflow源码:https://github.com/apache/incubator-airflow

    参考资料:http://www.open-open.com/lib/view/open1452002876105.html

    简介:http://www.cnblogs.com/xianzhedeyu/p/8047828.html

    重要参数介绍:http://www.cnblogs.com/skyrim/p/7456166.html

    http://blog.csdn.net/permike/article/details/52184621

    FAQ :http://blog.csdn.net/yingkongshi99/article/details/52658660

    容器:docker pull puckel/docker-airflow

     

     

    启动dag调度器, 注意启动调度器, 并不意味着dag会被马上触发, dag触发需要符合它自己的schedule规则

    如果缺省了END_DATE参数, END_DATE等同于START_DATE.

    使用 DummyOperator 来汇聚分支
    使用 ShortCircuitOperator/BranchPythonOperator 做分支
    使用 SubDagOperator 嵌入一个子dag
    使用 TriggerDagRunOperator 直接trigger 另一个dag

     在创建MyBashOperator的实例时候, 为on_failure_callback和on_success_callback参数设置两个回调函数, 我们在回调函数中, 将success或failed状态记录到自己的表中.

    DAG的schedule_interval参数设置成None, 表明这个DAG始终是由外部触发。

    如果将default_args字典传递给DAG,DAG将会将字典应用于其内部的任何Operator上。这很容易的将常用参数应用于多个Operator,而无需多次键入。

    default_args=dict(
        start_date=datetime(2016, 1, 1),
        owner='Airflow')
    
    dag = DAG('my_dag', default_args=default_args)
    op = DummyOperator(task_id='dummy', dag=dag)
    print(op.owner) # Airflow

     

    initdb,初始化元数据DB,元数据包括了DAG本身的信息、运行信息等;

    resetdb,清空元数据DB;

    list_dags,列出所有DAG;

    list_tasks,列出某DAG的所有task;

    test,测试某task的运行状况;

    backfill,测试某DAG在设定的日期区间的运行状况;

    webserver,开启webserver服务;

    scheduler,用于监控与触发DAG。

    $ cd ${AIRFLOW_HOME}/dags
    
    $ python test_import.py # 保证代码无语法错误
    
    $ airflow list_dags # 查看dag是否成功加载
    
    airflow list_tasks test_import_dag –tree # 查看dag的树形结构是否正确
    
    $ airflow test test_import_dag \ test_import_task 2016-3-7 # 测试具体的dag的某个task在某个时间的运行是否正常
    
    $ airflow backfill test_import_dag -s 2016-3-4 \ -e 2016-3-7 # 对dag进行某段时间内的完整测试
    

      


    # print the list of active DAGs
    airflow list_dags

    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks tutorial

    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks tutorial --tree

    请注意,airflow test命令在本地运行任务实例,将其日志输出到stdout(屏幕上),不会影响依赖关系,并且不会将状态(运行,成功,失败,...)发送到数据库。 它只是允许简单的测试单个任务实例。
    如果使用depends_on_past = True,则单个任务实例将取决于上一个任务实例的成功与否,如果指定本身的start_date,则忽略此依赖关系
    # start your backfill on a date range
    airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

    使用Xcom在task之间传参

    可以直接使用jinja模板语言,在{{}}中调用ti的xcom_push和xcom_pull方法,下面的例子为t1使用xcom_push推出了一个kv,t2通过taskid和key来接收

     dag = DAG(  
       dag_id='xcomtest', default_args=default_args, schedule_interval='*/2 * ** *')  
       
    t1 = BashOperator(  
       task_id='xcom',  
       bash_command='''''{{ ti.xcom_push(key='aaa', value='bbb') }}''',  
        dag=dag)  
       
    t2 = BashOperator(  
       task_id='xcom2',  
        bash_command='''''echo"{{ ti.xcom_pull(key='aaa', task_ids='xcom') }}" ''',  
        dag=dag)  
    t2.set_upstream(t1)  
    

      

    airflow提供了很多Macros Variables,可以直接使用jinja模板语言调用宏变量

    execution_date并不是task的真正执行时间,而是上一周期task的执行时间。
    我们在airflow上看到一个任务是6am执行的,而且interval=4hours,那么execution_date的值是2am,而不是6am

    暂时无法hold或pause某个task,只支持以dag为单位pause

    当使用BashOperator时,command需要调用脚本时,脚本后需要有个空格,否则报错,暂时不清楚原因,但加空格后可以正常执行,如下例,run.sh后需加空格


    Airflow为Operator提供许多常见任务,包括:
    BashOperator - 执行bash命令
    PythonOperator - 调用任意的Python函数
    EmailOperator - 发送邮件
    HTTPOperator - 发送 HTTP 请求
    SqlOperator - 执行 SQL 命令
    Sensor - 等待一定时间,文件,数据库行,S3键等...
    除了这些基本的构建块之外,还有更多的特定Operator:DockerOperator,HiveOperator,S3FileTransferOperator,PrestoToMysqlOperator,SlackOperator

     

    使用supervisord进行deamon

    airflow本身没有deamon模式,所以直接用supervisord就ok了,我们只要写4行代码

    [program:airflow_web]
    command=/home/kimi/env/athena/bin/airflow webserver -p 8080
    
    [program:airflow_scheduler]
    command=/home/kimi/env/athena/bin/airflow scheduler
    
    作者:yin1941
    链接:https://www.jianshu.com/p/59d69981658a
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    

      

    airflow 执行的命令或这种消息是支持 jinja2 模板语言;{{ ds }}是一种宏,表示当前的日期,
    形如2016-12-16,支持的宏在
    https://airflow.incubator.apache.org/code.html#macros

    test: 用于测试特定的某个task,不需要依赖满足
    run: 用于执行特定的某个task,需要依赖满足
    backfill: 执行某个DAG,会自动解析依赖关系,按依赖顺序执行
    unpause: 将一个DAG启动为例行任务,默认是关的,所以编写完DAG文件后一定要执行这和要命令,相反命令为pause
    scheduler: 这是整个 airflow 的调度程序,一般是在后台启动
    clear: 清除一些任务的状态,这样会让scheduler来执行重跑

     ============================

    前面的脚本里用到了{{ ds }}变量,每个DAG在执行时都会传入一个具体的时间(datetime对象), 这个ds就会在 render 命令时被替换成对应的时间。这里要特别强调一下, 对于周期任务,airflow传入的时间是上一个周期的时间(划重点),比如你的任务是每天执行, 那么今天传入的是昨天的日期,如果是周任务,那传入的是上一周今天的值

    ==========================

    executor
    SequentialExecutor:表示单进程顺序执行,通常只用于测试
    LocalExecutor:表示多进程本地执行,它用python的多进程库从而达到多进程跑任务的效果。
    CeleryExecutor:表示使用celery作为执行器,只要配置了celery,就可以分布式地多机跑任务,一般用于生产环境。
    sql_alchemy_conn :这个配置让你指定 airflow 的元信息用何种方式存储,默认用sqlite,如果要部署到生产环境,推荐使用 mysql。

    smtp :如果你需要邮件通知或用到 EmailOperator 的话,需要配置发信的 smtp 服务器

    ======================

    触发条件有两个维度, 以T1&T2->T3 这样的dag为例:
    一个维度是: 要根据dag上次运行T3的状态确定本次T3是否被调用, 由DAG的default_args.depends_on_past参数控制, 为True时, 只有上次T3运行成功, 这次T3才会被触发

    另一个维度是: 要根据前置T1和T2的状态确定本次T3是否被调用, 由T3.trigger_rule参数控制, 有下面6种情形, 缺省是all_success.
    all_success: (default) all parents have succeeded
    all_failed: all parents are in a failed or upstream_failed state
    all_done: all parents are done with their execution
    one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
    one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
    dummy: dependencies are just for show, trigger at will

    ========================

    airflow有两个基于PythonOperator的Operator来支持dag分支功能.

    ShortCircuitOperator, 用来实现流程的判断. Task需要基于ShortCircuitOperator, 如果本Task返回为False的话, 其下游Task将被skip; 如果为True的话, 其下游Task将会被正常执行. 尤其适合用在其下游都是单线节点的场景.

    BranchPythonOperator, 用来实现Case分支. Task需要基于BranchPythonOperator, airflow会根据本task的返回值(返回值是某个下游task的id),来确定哪个下游Task将被执行, 其他下游Task将被skip.

    ======================

     

    connection 表:
    我们的Task往往需要通过jdbc/ftp/http/webhdfs方式访问其他资源, 一般地访问资源时候都需要一些签证, airflow允许我们将这些connection以及鉴证存放在connection表中. 可以现在WebUI的Admin->Connections管理这些连接, 在代码中使用这些连接.

    MySQL 应该使用 mysqlclient 包, 我简单试了mysql-connector-python 有报错

    LocalExecutor 和 CeleryExecutor 都可用于生产环境, CeleryExecutor 将使用 Celery 作为Task执行的引擎, 扩展性很好, 当然配置也更复杂, 需要先setup Celery的backend(包括RabbitMQ, Redis)等. 其实真正要求扩展性的场景并不多, 所以LocalExecutor 是一个很不错的选择了.

    1. 配置OS环境变量 AIRFLOW_HOME, AIRFLOW_HOME缺省为 ~/airflow
    2. 运行下面命令初始化一个Sqlite backend DB, 并生成airflow.cfg文件
    your_python ${AIRFLOW_HOME}\bin\airflow initdb
    3. 如果需要修改backend DB类型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn后, 然后重新运行 airflow initdb .
    官方推荐使用MySQL/PostgreSQL做DB Server.

    有下面3个参数用于控制Task的并发度,
    parallelism, 一个Executor同时运行task实例的个数
    dag_concurrency, 一个dag中某个task同时运行的实例个数
    max_active_runs_per_dag: 一个dag同时启动的实例个数

    start_date 有点特别,如果你设置了这个参数,那么airflow就会从start_date开始以 schedule_interval 的规则开始执行,例如设置成3天前每小时执行一次,那么在调度正常启动时,就会立即调度 24*3 次,但注意,脚本执行环境的时间还是当前的系统时间,而不会说真是把系统时间模拟成3天前,所以感觉这个功能应用场景比较好限。

    ===========================

    dags_folder目录支持子目录和软连接,因此不同的dag可以分门别类的存储起来

    schedule_interval=timedelta(minutes=1) 或者 crontab格式
    crontab格式的介绍:https://www.cnblogs.com/chenshishuo/p/5152068.html http://blog.csdn.net/liguohanhaha/article/details/52261192

    sql_alchemy_conn = mysql://ct:152108@localhost/airflow
    对应字段解释如下: dialect+driver://username:password@host:port/database

    当遇到不符合常理的情况时考虑清空 airflow backend的数据库, 可使用airflow resetdb清空。
    删除dag文件后,webserver中可能还会存在相应信息,这时需要重启webserver并刷新网页。
    关闭webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}

    界面的时候看起来比较蛋疼, utc-0的时间,
    修改.../python2.7/site-packages/airflow/www/templates/admin/master.html如下(注释掉UCTSeconds,新增一行UTCSeconds), 这样时间就是本地时间了。


    验证脚本是否有问题:python xxx.py
    看是否能查询出新增的dags吗:airflow list_dags
    启动schedule :airflow scheduler


    这里有的 start_date 有点特别,如果你设置了这个参数,那么airflow就会从start_date开始以 schedule_interval 的规则开始执行,例如设置成3天前每小时执行一次,那么在调度正常启动时,就会立即调度 24*3 次,但注意,脚本执行环境的时间还是当前的系统时间,而不会说真是把系统时间模拟成3天前,所以感觉这个功能应用场景比较好限

    在centos6.8上装特别顺利(运行时貌似一切都正常,就是任务一直处于running状态---debug了一番源代码, 发现内存要必需够大,发现必需用非root身份运行airflow worker, 务必保证核数够用,否则需要调低dag_concurrency, max_active_runs_per_dag,max_threads,parallelism, 否则worker出现莫名其妙的问题)

    airflow跑着跑着就挂了,一看内存还够用(可能需要不要钱的加内存),如果你到处找不到想要的错误日志。那么看看AIRFLOW_HOME下面是不是莫名其妙的多了几个 .err/.out 的文件,进去看看会有收获。

    在需要运行作业的机器上的安装airflow airflow[celery] celery[redis] 模块后,启动airflow worker即可.这样作业就能运行在多个节点上.

    安装主模块
    [airflow@airflow ~]$ pip install airflow
    2.4.2 安装数据库模块、密码模块
    [airflow@airflow ~]$ pip install "airflow[postgres,password]"

    转载于:https://www.cnblogs.com/testzcy/p/8427141.html

    展开全文
  • Airflow笔记

    2020-07-14 19:25:25
    Airflow 已逐渐成为最流行的任务调度框架,加上本身由 Python 语言编写,对比 Azkaban 灵活性,可配置性更高 Airflow官网 配置参数介绍 default_args = { 'owner': 'Airflow', 'depends_on_past': False, 'email': ...

    Airflow 已逐渐成为最流行的任务调度框架,加上本身由 Python 语言编写,对比 Azkaban 灵活性,可配置性更高

    Airflow官网

    配置参数介绍

    	default_args = {
       
    	'owner': 'Airflow',
    	'depends_on_past': False,
    	'email': ['airflow@example.com'],
    	
    展开全文
  • [AirFlow]AirFlow使用指南二 DAG定义文件

    万次阅读 2017-08-03 11:57:13
    Code that goes along with the Airflow tutorial located at: https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operato

    1. Example

    """
    Code that goes along with the Airflow tutorial located at:
    https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
    """
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2015, 6, 1),
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG('tutorial', default_args=default_args)
    
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)
    
    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    
    t3 = BashOperator(
        task_id='templated',
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)
    
    t2.set_upstream(t1)
    t3.set_upstream(t1)
    

    你需要搞清楚的是(对于刚上手的人来说可能不是很直观),这个Airflow Python脚本只是一个配置文件,使用代码的方式指定了DAG的结构(与oozie使用xml方式不同)。这里定义的实际任务将在与此脚本的上下文不同的上下文中运行。不同的任务在不同的时间点在不同的工作节点(worker)上运行,这意味着这个脚本不能进行跨任务之间的交流。为此,我们有一个更高级的功能,称为XCom

    有人可能会将DAG定义文件认为是可以进行一些实际数据处理的地方 - 根本不是这样! 脚本的目的是定义一个DAG对象。它需要快速评估(秒级别,而不是分钟级别),因为调度程序将定期执行它以反映更改(如果有的话)。

    2. 导入模块

    Airflow管道只是一个Python脚本,目的是定义一个Airflow DAG对象。我们从导入我们需要的类库开始。

    # The DAG object; we'll need this to instantiate a DAG
    from airflow import DAG
    
    # Operators; we need this to operate!
    from airflow.operators.bash_operator import BashOperator
    

    3. 默认参数

    我们将要创建一个DAG和一些任务,我们可以选择将一组参数传递给每个任务的构造函数(这将变得多余),或者(更好的)我们可以定义一个默认参数的字典,可以在创建任务时使用。

    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2015, 6, 1),
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    

    有关BaseOperator参数及它们是干什么的的更多信息,请参阅:airflow.models.BaseOperator文档。

    另外,请注意,可以定义不同的参数集,用于不同的目的。一个例子就是在生产和开发环境之间设置不同的设置,使用不同的参数集。

    4. 实例化DAG

    我们需要一个DAG对象来嵌套我们的任务。这里我们传递一个定义dag_id的字符串(tutorial),它用作DAG的唯一标识符。我们还传递我们刚刚定义的默认参数字典,并为DAG的schedule_interval参数设置为1天。

    dag = DAG(
        'tutorial', default_args=default_args, schedule_interval=timedelta(1))
    

    5. 任务(Tasks)

    当实例化operator对象时会生成任务。从一个operator中实例化的任意对象都称为构造器。第一个参数task_id作为该任务的唯一标识符。

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)
    

    注意我们如何把operator特定参数(bash_command)和从BaseOperator继承来的对所有operator都常用的公共参数(retries)组成的混合参数传递到operator的构造器中的。另外,请注意,在第二个任务中,我们用参数3覆盖retries参数。

    任务参数的优先规则如下:

    • 显示传递的参数
    • default_args字典中存在的值
    • operator的默认值(如果存在)

    6. Jinja模板

    Airflow充分利用了Jinja 模板,为管道作者提供了一套内置的参数和宏。Airflow还为管道作者提供了钩子(hooks)来定义自己的参数,宏和模板。

    本教程几乎无法在Airflow中对模板进行操作,但本节的目标是让你了解此功能的存在,让你熟悉一下双大括号,并认识一下最常见的模板变量:{{ ds }}

    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7) }}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    
    t3 = BashOperator(
        task_id='templated',
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)
    

    请注意,templated_command{%%}块中包含代码逻辑,可以像{{ds}}一样引用参数,像{{macros.ds_add(ds,7)}}中一样调用函数,并在{{params.my_param}}引用自定义参数。

    文件也可以传递给bash_command参数,如bash_command ='templated_command.sh',文件位置是相对于包含管道文件(在这个例子中为tutorial.py)的目录。这由于许多原因而需要的,例如分离脚本的逻辑和流水线代码,允许在不同语言组成的文件中进行适当的代码突出显示。

    7. 建立依赖关系

    我们有两个不相互依赖的简单任务。 这里有几种方法可以定义它们之间的依赖关系:

    t2.set_upstream(t1)
    
    # This means that t2 will depend on t1
    # running successfully to run
    # It is equivalent to
    # t1.set_downstream(t2)
    
    t3.set_upstream(t1)
    
    # all of this is equivalent to
    # dag.set_dependency('print_date', 'sleep')
    # dag.set_dependency('print_date', 'templated')
    

    请注意,当执行脚本时,如果在DAG中找到一条环形链路(例如A依赖于B,B又依赖于C,C又依赖于A)或者一个依赖被多次引用时引发异常(when it finds cycles in your DAG or when a dependency is referenced more than once)。

    8. 概括

    经上述介绍之后,我们有了一个基本的DAG。此时我们的代码应该如下所示:

    """
    Code that goes along with the Airflow located at:
    http://airflow.readthedocs.org/en/latest/tutorial.html
    """
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2015, 6, 1),
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG(
        'tutorial', default_args=default_args, schedule_interval=timedelta(1))
    
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)
    
    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    
    t3 = BashOperator(
        task_id='templated',
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)
    
    t2.set_upstream(t1)
    t3.set_upstream(t1)
    

    9. 测试

    9.1 运行脚本

    是时候运行一些测试样例了。首先让我们确定管道解析。假设我们正在将上一步tutorial.py中的代码保存在workflow.cfg中配置(dags_folder)的DAG文件夹中。DAG的默认存储位置为$AIRFLOW_HOME/dags中。

    python ~/airflow/dags/tutorial.py
    

    如果你的脚本没有抛出异常,这意味着你代码中没有可怕的错误,并且你的Airflow环境是健全的。

    9.2 命令行元数据验证

    我们来运行一些命令来进一步验证这个脚本。

    # print the list of active DAGs
    airflow list_dags
    
    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks tutorial
    
    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks tutorial --tree
    

    验证结果:

    xiaosi@yoona:~$ airflow list_dags
    [2017-08-02 21:35:06,134] {__init__.py:57} INFO - Using executor SequentialExecutor
    [2017-08-02 21:35:06,274] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
    [2017-08-02 21:35:06,293] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
    [2017-08-02 21:35:06,607] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
    
    
    -------------------------------------------------------------------
    DAGS
    -------------------------------------------------------------------
    example_bash_operator
    example_branch_dop_operator_v3
    example_branch_operator
    example_http_operator
    example_passing_params_via_test_command
    example_python_operator
    example_short_circuit_operator
    example_skip_dag
    example_subdag_operator
    example_subdag_operator.section-1
    example_subdag_operator.section-2
    example_trigger_controller_dag
    example_trigger_target_dag
    example_xcom
    latest_only
    latest_only_with_trigger
    test_utils
    tutorial
    
    xiaosi@yoona:~$ airflow list_tasks tutorial
    [2017-08-02 21:35:37,444] {__init__.py:57} INFO - Using executor SequentialExecutor
    [2017-08-02 21:35:37,550] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
    [2017-08-02 21:35:37,569] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
    [2017-08-02 21:35:37,811] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
    print_date
    sleep
    templated
    xiaosi@yoona:~$ airflow list_tasks tutorial --tree
    [2017-08-02 21:35:46,470] {__init__.py:57} INFO - Using executor SequentialExecutor
    [2017-08-02 21:35:46,578] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
    [2017-08-02 21:35:46,597] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
    [2017-08-02 21:35:46,841] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
    <Task(BashOperator): sleep>
        <Task(BashOperator): print_date>
    <Task(BashOperator): templated>
        <Task(BashOperator): print_date>
    

    9.3 测试

    我们通过在特定日期运行实际的任务实例进行测试。在此上下文中指定的日期是一个execution_date,它模拟在特定日期+时间上运行任务或dag的调度程序:

    # command layout: command subcommand dag_id task_id date
    
    # testing print_date
    airflow test tutorial print_date 2015-06-01
    
    # testing sleep
    airflow test tutorial sleep 2015-06-01
    

    运行结果:

    xiaosi@yoona:~$ airflow test tutorial print_date 2015-06-01
    [2017-08-02 21:39:55,781] {__init__.py:57} INFO - Using executor SequentialExecutor
    [2017-08-02 21:39:55,889] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
    [2017-08-02 21:39:55,908] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
    [2017-08-02 21:39:56,153] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
    [2017-08-02 21:39:56,315] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01 00:00:00 [None]>
    [2017-08-02 21:39:56,317] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01 00:00:00 [None]>
    [2017-08-02 21:39:56,317] {models.py:1318} INFO -
    --------------------------------------------------------------------------------
    Starting attempt 1 of 2
    --------------------------------------------------------------------------------
    
    [2017-08-02 21:39:56,318] {models.py:1342} INFO - Executing <Task(BashOperator): print_date> on 2015-06-01 00:00:00
    [2017-08-02 21:39:56,327] {bash_operator.py:71} INFO - tmp dir root location:
    /tmp
    [2017-08-02 21:39:56,328] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpc1BGXE//tmp/airflowtmpc1BGXE/print_dateITSGQK
    [2017-08-02 21:39:56,328] {bash_operator.py:81} INFO - Running command: date
    [2017-08-02 21:39:56,332] {bash_operator.py:90} INFO - Output:
    [2017-08-02 21:39:56,335] {bash_operator.py:94} INFO - 2017年 08月 02日 星期三 21:39:56 CST
    [2017-08-02 21:39:56,336] {bash_operator.py:97} INFO - Command exited with return code 0
    xiaosi@yoona:~$
    xiaosi@yoona:~$
    xiaosi@yoona:~$ airflow test tutorial sleep 2015-06-01
    [2017-08-02 21:40:41,594] {__init__.py:57} INFO - Using executor SequentialExecutor
    [2017-08-02 21:40:41,700] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
    [2017-08-02 21:40:41,719] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
    [2017-08-02 21:40:41,964] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
    [2017-08-02 21:40:42,126] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.sleep 2015-06-01 00:00:00 [None]>
    [2017-08-02 21:40:42,128] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.sleep 2015-06-01 00:00:00 [None]>
    [2017-08-02 21:40:42,128] {models.py:1318} INFO -
    --------------------------------------------------------------------------------
    Starting attempt 1 of 2
    --------------------------------------------------------------------------------
    
    [2017-08-02 21:40:42,128] {models.py:1342} INFO - Executing <Task(BashOperator): sleep> on 2015-06-01 00:00:00
    [2017-08-02 21:40:42,137] {bash_operator.py:71} INFO - tmp dir root location:
    /tmp
    [2017-08-02 21:40:42,138] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpfLOkuA//tmp/airflowtmpfLOkuA/sleepOoXZ0X
    [2017-08-02 21:40:42,138] {bash_operator.py:81} INFO - Running command: sleep 5
    [2017-08-02 21:40:42,143] {bash_operator.py:90} INFO - Output:
    [2017-08-02 21:40:47,146] {bash_operator.py:97} INFO - Command exited with return code 0
    

    现在是否还记得我们之前用模板做了什么? 通过运行以下命令,查看如何渲染和执行此模板:

    # testing templated
    airflow test tutorial templated 2015-06-01
    

    运行结果:

    xiaosi@yoona:~$ airflow test tutorial templated 2015-06-01
    [2017-08-02 21:43:40,089] {__init__.py:57} INFO - Using executor SequentialExecutor
    [2017-08-02 21:43:40,196] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
    [2017-08-02 21:43:40,214] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
    [2017-08-02 21:43:40,458] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
    [2017-08-02 21:43:40,620] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.templated 2015-06-01 00:00:00 [None]>
    [2017-08-02 21:43:40,622] {models.py:1126} INFO - Dependencies all met for <TaskInstance: tutorial.templated 2015-06-01 00:00:00 [None]>
    [2017-08-02 21:43:40,622] {models.py:1318} INFO -
    --------------------------------------------------------------------------------
    Starting attempt 1 of 2
    --------------------------------------------------------------------------------
    
    [2017-08-02 21:43:40,623] {models.py:1342} INFO - Executing <Task(BashOperator): templated> on 2015-06-01 00:00:00
    [2017-08-02 21:43:40,638] {bash_operator.py:71} INFO - tmp dir root location:
    /tmp
    [2017-08-02 21:43:40,639] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpHmgW9g//tmp/airflowtmpHmgW9g/templated086SvH
    [2017-08-02 21:43:40,639] {bash_operator.py:81} INFO - Running command:
    
        echo "2015-06-01"
        echo "2015-06-08"
        echo "Parameter I passed in"
    
        echo "2015-06-01"
        echo "2015-06-08"
        echo "Parameter I passed in"
    
        echo "2015-06-01"
        echo "2015-06-08"
        echo "Parameter I passed in"
    
        echo "2015-06-01"
        echo "2015-06-08"
        echo "Parameter I passed in"
    
        echo "2015-06-01"
        echo "2015-06-08"
        echo "Parameter I passed in"
    
    [2017-08-02 21:43:40,643] {bash_operator.py:90} INFO - Output:
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-01
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-08
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - Parameter I passed in
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-01
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-08
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - Parameter I passed in
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-01
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-08
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - Parameter I passed in
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-01
    [2017-08-02 21:43:40,644] {bash_operator.py:94} INFO - 2015-06-08
    [2017-08-02 21:43:40,645] {bash_operator.py:94} INFO - Parameter I passed in
    [2017-08-02 21:43:40,645] {bash_operator.py:94} INFO - 2015-06-01
    [2017-08-02 21:43:40,645] {bash_operator.py:94} INFO - 2015-06-08
    [2017-08-02 21:43:40,645] {bash_operator.py:94} INFO - Parameter I passed in
    [2017-08-02 21:43:40,645] {bash_operator.py:97} INFO - Command exited with return code 0
    

    这将显示事件的详细日志,并最终运行你的bash命令并打印结果。

    请注意,airflow test命令在本地运行任务实例,将其日志输出到stdout(屏幕上),不会影响依赖关系,并且不会将状态(运行,成功,失败,...)发送到数据库。 它只是允许简单的测试单个任务实例。

    9.4 Backfill

    一切看起来都运行正常,所以让我们运行一个backfill。backfill将遵照依赖关系,并将日志发送到文件中,与数据库通信以记录状态。如果你启动webserver,你可以跟踪进度。如果你有兴趣可以在backfill过程中跟踪进度,airflow webserver将启动Web服务器。

    注意,如果使用depends_on_past = True,则单个任务实例将取决于上一个任务实例的成功与否,如果指定本身的start_date,则忽略此依赖关系(except for the start_date specified itself, for which this dependency is disregarded.)。

    此上下文中的日期范围是start_date和可选的end_date,用于使用此dag中的任务实例填充运行计划。

    # optional, start a web server in debug mode in the background
    # airflow webserver --debug &
    
    # start your backfill on a date range
    airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
    

    原文:http://airflow.incubator.apache.org/tutorial.html#example-pipeline-definition

    展开全文
  • airflow FAQ

    千次阅读 2016-09-25 10:30:13
    关于airflow使用过程中的一些常见问题记录
  • Airflow安装说明

    2020-03-30 14:28:02
    1、airflow安装 # airflow needs a home, ~/airflow is the default, # but you can lay foundation somewhere else if you prefer # (optional) export AIRFLOW_HOME=~/airflow # install from pypi using pip ...
  • Michael Kotliar,Andrey V Kartashov,Artem Barski,CWL-Airflow:支持通用工作流语言的轻量级管道管理器,GigaScience,第8卷,第7期,2019年7月,giz084, //doi.org/10.1093/gigascience/giz084 获取最新版本 ...
  • Airflow基础知识

    千次阅读 2018-03-07 17:08:31
    转载:http://blog.csdn.net/u012965373/article/details/72878491安装跳过,我的另一篇有专门的安装笔记Airflow安装安装过后开始看代码:[python] view plain copydefault_args = { 'owner': 'airflow', '...
  • Airflow 调度基础

    2019-09-28 02:32:22
    1. Airflow Airflow是一个调度、监控工作流的平台。用于将一个工作流制定为一组任务的有向无环图(DAG),并指派到一组计算节点上,根据相互之间的依赖关系,有序执行。 2. 安装 pip安装airflow: pip3 install ...
  • Airflow核心源码解读

    2021-02-19 17:22:16
    注意:本文基于Airflow 1.10解读源码 Airflow目前已经成为主流的作业调度工具,支持本地调度、分布式调度、Kubernetes调度。Airflow虽然使用Python实现,但功能依然很强大,其配置参数也多达250个。其中很多核心...
  • airflow 入门示例

    2020-02-03 17:13:40
    目录 导入模块 设置默认参数 实例化一个DAG 任务 Templating with Jinja 设置依赖关系 简要重述以上内容 测试 ...# DAG用来实例化DAG对象,注意仅仅只是定义了一个...from airflow.operators.bash_operator i...
  • Airflow 简介及原理

    2020-04-24 13:41:57
    Airflow 简介及原理 Airflow 一个用于编排复杂计算工作流和数据处理流水线的开源工具,通常可以解决一些复杂超长 Cron 脚本任务或者大数据的批量处理任务。 其工作流的设计是基于有向无环图 (Directed Acyclical ...
  • 作者:Corwien来源:SegmentFault 思否社区一、Airflow简介Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台。...
  • Airflow Utils-crx插件

    2021-04-02 03:29:55
    语言:English 使airflow web ui更好 ** 可能需要授予您的气流URL的权限 单击“Airflow”图标浏览器工具栏,然后选择“可以阅读或更改网站数据” ** *滚动到日志页面底部时自动刷新 *手动触发DAG时自动重定向 *使用...
  • Airflow Lifunf-crx插件

    2021-04-02 03:27:37
    语言:English 这种意图帮助穷人使用气流! ###是什么? 这是一个Chrome扩展,可以在Airflow上实现某些功能: - Airflow DAG页面上的自动刷新。 当监视/测试任务执行时,这有助于很多。 现在你可以喝你的咖啡啤酒,...
  • 参考文档: airflow2.0.2分布式安装文档 ariflow官方文档 ... 安装Airflow(重要)2.1 配置 airflow sudo权限2.2 设置Airflow环境变量2.3 安装airflow2.4 配置ariflow2.5 启动airflow集群2.6 登录webui查看
  • Airflow 使用及原理分析

    千次阅读 2019-11-16 08:15:00
    Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台。Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具,不需要知道业务数据的具体内容,设置任务的...
  • Airflow安装详细介绍以及入门Demo

    千次阅读 2019-04-01 11:07:37
    Airflow 是什么 Airflow 是 Airbnb 开发的用于工作流管理的开源项目,自带 web UI 和调度。现在 Apache 下做孵化,地址是https://github.com/apache/airflow airflow Airflow 解决什么问题 Airflow 主要...
  • airflow 框架入门文档

    千次阅读 2018-04-11 20:53:08
    Apache Airflow 是一个用于编排复杂计算工作流和数据处理流水线的开源工具。 如果您发现自己运行的是执行时间超长的 cron 脚本任务,或者是大数据的批处理任务,Airflow 可能是能帮助您解决目前困境的神器。本文将...
  • Airflow的使用及注意事项

    千次阅读 2020-03-17 11:42:07
    airflow 是一个使用python语言编写的data pipeline调度和监控工作流的平台。Airflow可以用来创建、监控和调整数据管道。任何工作流都可以在这个使用Python来编写的平台上运行。 Airflow是一种允许工作流开发人员...
  • Apache Airflow 是一个用于编排复杂计算工作流和数据处理流水线的开源工具。 如果您发现自己运行的是执行时间超长的 cron 脚本任务,或者是大数据的批处理任务,Airflow 可能是能帮助您解决目前困境的神器。本文将为...
  • airflow数据平台Each time we deploy our new software, we will check the log file twice a day to see whether there is an issue or exception in the following one or two weeks. One colleague asked me is ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 955
精华内容 382
关键字:

airflow语言