airflow Operators

Posted damahuhu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了airflow Operators相关的知识,希望对你有一定的参考价值。

airflow Operators

20190927


一、 Dag 编写步骤

  1. import DAG类和若干operater类以及必要的Python模块
  2. 设定默认参数,创建DAG对象
  3. 提供必要的参数(比如task_id和dag),创建Task(即Operator对象)
  4. 设定Task的上下游依赖关系

1. import DAG类

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta

2. 设置一些默认参数

  • 所有的 Operator 都是从BaseOperator 派生而来,并通过继承获得更多功能
  • 参考【airflow operators-CSDN
  • default_args设置的是DAG的通用参数,这些通用参数会直接传递给DAG下属的所有Task,这些参数也可以在创建Task时传入
default_args = {
    # 常用
    'owner': 'airflow', # 这个DAG的所有者,会在Web UI上显示,主要用于方便管理
    'depends_on_past': False,  # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行
    'start_date': datetime(2015, 6, 1), 
    # DAG的开始时间,比如这里就是从2015年6月1日开始执行第一个DAG。这个参数会影响到部署上线时回填DAG的数
    #量。一般建议写成上线时间的前一天(因为这里的start_date指的是execute_date,而Airflow执行的逻辑是,
    #今天的同一时间执行昨天的任务,比如execute_date=2018-03-01,每天凌晨3点执行,则会在2018-03-02 
    #03:00:00启动这个DAG。特别地,这个参数必须一个datetime对象,不可以用字符串
    'email': ['airflow@example.com'],# 出问题时,发送报警Email的地址,可以填多个,用逗号隔开
    'email_on_failure': False, # 任务失败且重试次数用完时是否发送Email,推荐填True
    'email_on_retry': False, # 任务重试时是否发送Email
    'retries': 1, # 任务失败后的重试次数
    'retry_delay': timedelta(minutes=5), # 重试间隔,必须是timedelta对象
    # 不常用
    'queue': 'bash_queue', # 队列,默认是default,决定实际执行任务会发送到哪个worker
    'pool': 'backfill',  # pool是一个分类限制并发量的设计,目前来说可以忽略,默认所有的Task都在一个pool里。
    'priority_weight': 10, # 优先级权重,在任务需要排队时而你需要优先执行某些任务时会有用
    'end_date': datetime(2016, 1, 1), # 结束时间,一般线上任务都会一直跑下去,所以没必要设置
}

3. 创建DAG对象

dag = DAG('tutorial', default_args=default_args,schedule_interval="0 3 * * *") 
# 第一个参数固定为dag的名字(即这个.py脚本的名称)
# schedule_interval为执行时间间隔,同crontab的语法
# 在这个例子中表示每天凌晨3点执行

4. 创建Task

# 这是一个SSHOperator的task示例
task_1 = SSHOperator(
    ssh_conn_id='ssh_24', # 指定conn_id
    task_id='task_shopping',
    command='/bin/bash path/my.sh $(date -d "yesterday" +%Y%m%d) ', # 远程机器上的脚本文件
    dag=dag
)

5. 设定Task依赖关系

  • 设定依赖有两种方式,一种是使用对象的方法
  • set_upstreamset_downstream来设置上下游依赖
  • 另一种是使用运算符,比如下面的 task_1 << task_2,表示task_1task_2的下游对象
 task_1 << task_2

二、完整的例子

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime, timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")
dt = local_tz.convert(datetime(2019, 9, 27))

default_args = {
    'owner': 'lion',
    'depend_on_past': False,
    'email': ['my@email','other@email'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date':dt,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(dag_id="ssh_myssh", default_args=default_args, schedule_interval='0 6 * * *')

task_1 = SSHOperator(
    ssh_conn_id='ssh_24',,
    task_id='task_shopping',
     command='/bin/bash path/my_1.sh $(date -d "yesterday" +%Y%m%d) ',
    dag=dag
)

task_2 = SSHOperator(
    ssh_conn_id='ssh_24',
    task_id='task_dimming',
     command='/bin/bash path/my_2.sh $(date -d "yesterday" +%Y%m%d) ',
    dag=dag
)

task_1 >> task_2

参考

airflow operators-CSDN
airflow DAG

以上是关于airflow Operators的主要内容,如果未能解决你的问题,请参考以下文章

airflow Operators

Airflow 将 jinja 模板作为字符串

使自定义 Airflow 宏扩展其他宏

Airflow 中文文档:保护连接

气流 - Python 文件不在同一个 DAG 文件夹中

DAG 在 Web-UI 中不可见