气流:每个任务运行大量实例时需要建议

Posted

技术标签:

【中文标题】气流:每个任务运行大量实例时需要建议【英文标题】:Airflow : need advices when running a lot of instances per task 【发布时间】:2017-07-24 16:46:42 【问题描述】:

这是我在 Stack 上的第一篇文章,是关于 Airflow 的。我需要实现一个 DAG,它将:

1/ 从 API 下载文件

2/ 将它们上传到 Google Cloud Storage

3/ 将它们插入 BigQuery

问题是第 1 步涉及大约 170 个要调用的帐户。如果在下载过程中出现任何错误,我希望我的 DAG 从异常结束的步骤中自动重试。因此,我在我的任务之上实现了一个循环,例如:

dag = DAG('my_dag', default_args=DEFAULT_ARGS)

for account in accounts:

    t1 = PythonOperator(task_id='download_file_' + account['id'],
                 python_callable=download_files(account),
                 dag=my_dag)

    t2 = FileToGoogleCloudStorageOperator(task_id='upload_file_' + account['id'],
                google_cloud_storage_conn_id = 'gcs_my_conn',
                src = 'file_'  + account['id'] + '.json',
                bucket = 'my_bucket',
                dag=my_dag)

    t3 = GoogleCloudStorageToBigQueryOperator(task_id='insert_bq',
                bucket = 'my_bucket',
                google_cloud_storage_conn_id = 'gcs_my_conn',
                bigquery_conn_id = 'bq_my_conn',
                src = 'file_'  + account['id'],
                destination_project_dataset_table = 'my-project:my-dataset.my-table',
                source_format = 'NEWLINE_DELIMITED_JSON',
                dag=my_dag)

    t2.set_upstream(t1)
    t3.set_upstream(t2)

所以在 UI 级别,每个任务显示我有大约 170 个实例。当我手动运行 DAG 时,就我所见,Airflow 什么也没做。 DAG 不会初始化或排队任何任务实例。我想这是由于涉及的实例数量,但我不知道如何解决这个问题。

我应该如何管理这么多任务实例?

谢谢,

亚历克斯

【问题讨论】:

嗨,AlexLng,你的 DAG 并发设置是什么,听起来你希望一次只允许运行 1 个任务。 嗨@Chengzhi,是的,这正是我要找的。我不能对我的 API 调用使用并行性,因为我会被提供商拒绝。我只是默认保留了并发设置,因为它使用的是 SequentialExecutor(应该一次运行一个任务实例)。 您好,承志,您有什么建议吗? 嗨,Alex,你能发布一下你的 DAG default_arg 是什么样子的吗?您可以通过在 python 运算符中添加参数 retries 来处理重试部分。 承志,我的 DAG default_arg 长这样:`DEFAULT_ARGS = 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.datetime(2017, 7, 25), 'email': ['myemail'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'retry_delay': datetime.timedelta(minutes=2) `你能解释一下你是什么吗'重新考虑这个重试参数?使用我当前的设置,重试将从头开始每个任务,这将导致我的 BigQuery 表中出现重复。 【参考方案1】:

您目前如何运行气流?你确定airflow scheduler 正在运行吗?

您也可以运行airflow list_dags 以确保可以编译dag。如果您使用 Celery 运行气流,则应注意在所有运行气流的节点上使用 list_dags 显示您的 dag。

【讨论】:

嗨 Matthijs,是的,调度程序正在运行,并且 list_dags 正确显示了我的 dag。我现在正在使用 SequentialExecutor。【参考方案2】:

Alex,在这里发帖会更容易,我看到你有 DEFAULT_ARGS 重试,在 DAG 级别,你也可以在任务级别设置重试。它在 BaseOperator 中,因为所有 Operator 都会继承 BaseOperator 然后你可以使用它,你可以在这里找到更多细节:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/python_operator.py 和 https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1864,如果你在模型中检查 BaseOperator,它有 retriesretry_delay,你可以这样做:

t1 = PythonOperator(task_id='download_file_' + account['id'],
                 python_callable=download_files(account),
                 retries=3,
                 retry_delay=timedelta(seconds=300),
                 dag=my_dag)

【讨论】:

以上是关于气流:每个任务运行大量实例时需要建议的主要内容,如果未能解决你的问题,请参考以下文章

Airflow 中文文档:使用Mesos扩展(社区贡献)

如何防止气流回填dag运行?

执行任务后气流调度程序似乎没有运行

如何将数据帧传递到气流任务的临时表中

测试气流:具有 DAG 的任务和任务上下文未在 pytest 中运行

气流操作员从外部Rest API提取数据