气流 - 试图循环操作员。执行不是等待实际操作完成

Posted

技术标签:

【中文标题】气流 - 试图循环操作员。执行不是等待实际操作完成【英文标题】:Airflow - trying to loop an operator. The execution is not waiting for actual operation to complete 【发布时间】:2018-12-19 22:48:03 【问题描述】:

在 Airflow 中 - 我正在尝试循环操作员。 (BigQuery 运算符)。 DAG 甚至在查询完成之前完成。

我的 DAG 本质上是:

    逐个读取一组插入查询。 使用 BigQueryOperator 触发每个查询。

当我尝试写入 2 条记录(带有 2 条插入语句)时 - 在作业之后我只能看到 1 条记录。

dag
bteqQueries = ReadFile() --Read GCP bucket file and get the list of SQL queries (as text) separated by new line

for currQuery in bteqQueries.split('\n'):
    #logging.info("currQuery : ".format(currQuery))
     parameter = 
    'cur_query': currQuery

    logging.info("START $$ : ".format(parameter.get('cur_query')))
    gcs2BQ = BigQueryOperator(
    task_id='gcs2bq_insert',
    bql=parameter.get('cur_query'),
    write_disposition="WRITE_APPEND",
    bigquery_conn_id='bigquery_default',
    use_legacy_sql='False',
    dag=dag,
    task_concurrency=1)
    logging.info("END $$ : ".format(parameter.get('cur_query')))


gcs2BQ

期望执行输入文件(在 GCS 存储桶中)中的所有查询。我有几个插入查询,并期望在最终的 bigquery 表中有 2 条记录。但我只看到 1 条记录。

********下面是日志******

 2018-12-19 03:57:16,194] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,190] gcs2BQ_bteq.py:59 INFO - START $$ : insert into `gproject.bucket.employee_test_stg.employee_test_stg` (emp_id,emp_name,edh_end_dttm) values (2,"srikanth","2099-01-01") ; 
[2018-12-19 03:57:16,205] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,201] models.py:2190 WARNING - schedule_interval is used for <Task(BigQueryOperator): gcs2bq_insert>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-12-19 03:57:16,210] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,209] gcs2BQ_bteq.py:68 INFO - END $$ : insert into `project.bucket.employee_test_stgemployee_test_stg` (emp_id,emp_name,edh_end_dttm) values (2,"srikanth","2099-01-01") ; 
[2018-12-19 03:57:16,213] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,213] gcs2BQ_bteq.py:59 INFO - START $$ : insert into `project.bucket.employee_test_stg` (emp_id,emp_name,edh_end_dttm) values (3,"srikanth","2099-01-01") ;
[2018-12-19 03:57:16,223] base_task_runner.py:98 INFO - Subtask: 
[2018-12-19 03:57:16,218] models.py:2190 WARNING - schedule_interval is used for <Task(BigQueryOperator): gcs2bq_insert>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead
[2018-12-19 03:57:16,230] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,230] gcs2BQ_bteq.py:68 INFO - END $$ : insert into `dataset1.adp_etl_stg.employee_test_stg` (emp_id,emp_name,edh_end_dttm) values (3,"srikanth","2099-01-01") ;
[2018-12-19 03:57:16,658] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,655] bigquery_operator.py:90 INFO - Executing: insert into `dataset1.adp_etl_stg.employee_test_stg` (emp_id,emp_name,edh_end_dttm) values (2,"srikanth","2099-01-01") ; 
[2018-12-19 03:57:16,703] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,702] gcp_api_base_hook.py:74 INFO - Getting connection using `gcloud auth` user, since no key file is defined for hook.
[2018-12-19 03:57:16,848] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,847] discovery.py:267 INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2018-12-19 03:57:16,849] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:16,849] client.py:595 INFO - Attempting refresh to obtain initial access_token
[2018-12-19 03:57:17,012] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:17,011] discovery.py:852 INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/gcp-***Project***/jobs?alt=json
[2018-12-19 03:57:17,214] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:17,214] discovery.py:852 INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/gcp-***Project***/jobs/job_jqrRn4lK8IHqTArYAVj6cXRfLgDd?alt=json
[2018-12-19 03:57:17,304] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:17,303] bigquery_hook.py:856 INFO - Waiting for job to complete : gcp-***Project***, job_jqrRn4lK8IHqTArYAVj6cXRfLgDd
[2018-12-19 03:57:22,311] base_task_runner.py:98 INFO - Subtask: [2018-12-19 03:57:22,310] discovery.py:852 INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/gcp-***Project***/jobs/job_jqrRn4lK8IHqTArYAVj6cXRfLgDd?alt=json

【问题讨论】:

即使可以直接在 DAG 中编写 for 循环,也不会像在普通 Python 脚本中那样执行。您需要将循环放入一个函数中,并将该函数作为 Airflow 任务调用,并带有 PythonOperator 当我尝试这个时 - 实际的 BigQueryOperator (gcs2BQ) 在函数之外是不可见的。因此它没有被触发。 【参考方案1】:

试试下面的代码:

gcs2BQ = []
for index, currQuery in enumerate(bteqQueries.split('\n')):
    logging.info("currQuery : ".format(currQuery))
    parameter = 
        'cur_query': currQuery
    
    logging.info("START $$ : ".format(parameter.get('cur_query')))
    gcs2BQ.append(BigQueryOperator(
        task_id='gcs2bq_insert_'.format(index),
        bql=parameter.get('cur_query'),
        write_disposition="WRITE_APPEND",
        bigquery_conn_id='bigquery_default',
        use_legacy_sql='False',
        dag=dag,
        task_concurrency=1))
    logging.info("END $$ : ".format(parameter.get('cur_query')))

    if index == 0:
        gcs2BQ[0]
    else:
        gcs2BQ[index - 1] >> gcs2BQ[index]

基本上,task_id 应该是唯一的,您可以使用上面的代码指定对查询的显式依赖。

【讨论】:

非常感谢。这行得通。是否可以将其放在 python 函数中并触发?当我尝试它时 - 由于 gcs2BQ 运算符不在函数之外的范围内,它没有被触发。 很高兴它成功了。你能接受这个答案并投票吗? :) 。关于你的另一个问题:你为什么要把它放在一个函数中?可以放在一个函数里面,触发只需要调用那个函数。 循环部分有效。但在 DAG 中,我尝试使用 3 个 DML 执行。它正确循环并成功。相同的 DAG,当我尝试使用 2 个 DML 执行时 - 它为 2 个 DML 正确运行并显示第三个任务失败。你能告诉我为什么这样做,我们如何避免它?日志:[2019-01-03 02:39:02,157] base_task_runner.py:98 信息 - 子任务:异常:BigQuery 作业失败。最终错误是:u'reason': u'invalidQuery', u'message': u'Syntax error: Unexpected end of statement at [1:1]', u'location': u'query'。跨度>

以上是关于气流 - 试图循环操作员。执行不是等待实际操作完成的主要内容,如果未能解决你的问题,请参考以下文章

怎么让当前线程等待另一个线程完成之后再去执行

Dart - 在 for 循环中等待所有异步任务

等待函数完成,直到回调到来

执行脚本sh install.sh后:需要输入1等待5秒并回车,等待10秒输入y并回车 ,如何用脚本完成这两个操作。

气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作

Java多线程之三volatile与等待通知机制示例