在 BigQueryOperator 中拉取 xcom

Posted

技术标签:

【中文标题】在 BigQueryOperator 中拉取 xcom【英文标题】:pull xcom in BigQueryOperator 【发布时间】:2019-04-01 19:26:34 【问题描述】:

我正在尝试运行一个 BigQueryOperator,该参数基于先前使用 xcom 的任务(我设法使用带有 xcom_push=True 的 BashOperator 推送它)

我认为使用以下方法可以解决问题

def get_next_run_date(**context):
    last_date = context['task_instance'].xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip()
    last_date = datetime.strptime(last_date, "%Y%m%d").date()
    return last_date + timedelta(days=1)

t3 = BigQueryOperator(
    task_id='autoplay_calc',
    bql='autoplay_calc.sql',
    params=
            "env" : deployment
            ,"region" : region
            ,"partition_start_date" : get_next_run_date()
            ,
    bigquery_conn_id='gcp_conn',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    #provide_context=True,
    destination_dataset_table=reporting_project + '.pa_reporting_public_batch.autoplay_calc',
    dag=dag
    )` 

但使用上述内容会为我提供带有“task_instance”错误的 Broken Dag 错误。

【问题讨论】:

【参考方案1】:

您是否尝试过使用 context['ti'].xcom_pull()?

【讨论】:

【参考方案2】:

你用错了。

您不能在params 中使用xcom。您需要在bql/sql 参数中使用它。你的 sql 文件,autoplay_calc.sql 可以包含类似

select * from XYZ where date == "xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip() "

【讨论】:

以上是关于在 BigQueryOperator 中拉取 xcom的主要内容,如果未能解决你的问题,请参考以下文章

使用 Atlassian Sourcetree,是不是可以在执行拉取之前查看将从远程存储库中拉取哪些更改?

jQuery 从目录中拉取图片

无法在 CloudBuild 中拉取 golang 映像

在 Angular 中拉取和订阅 firebase 数据

如何从Gitee中拉取项目到HBuilder中?

从数据库中拉取跨域数据