在 BigQueryOperator 中拉取 xcom
Posted
技术标签:
【中文标题】在 BigQueryOperator 中拉取 xcom【英文标题】:pull xcom in BigQueryOperator 【发布时间】:2019-04-01 19:26:34 【问题描述】:我正在尝试运行一个 BigQueryOperator,该参数基于先前使用 xcom 的任务(我设法使用带有 xcom_push=True 的 BashOperator 推送它)
我认为使用以下方法可以解决问题
def get_next_run_date(**context):
last_date = context['task_instance'].xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip()
last_date = datetime.strptime(last_date, "%Y%m%d").date()
return last_date + timedelta(days=1)
t3 = BigQueryOperator(
task_id='autoplay_calc',
bql='autoplay_calc.sql',
params=
"env" : deployment
,"region" : region
,"partition_start_date" : get_next_run_date()
,
bigquery_conn_id='gcp_conn',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
allow_large_results=True,
#provide_context=True,
destination_dataset_table=reporting_project + '.pa_reporting_public_batch.autoplay_calc',
dag=dag
)`
但使用上述内容会为我提供带有“task_instance”错误的 Broken Dag 错误。
【问题讨论】:
【参考方案1】:您是否尝试过使用 context['ti'].xcom_pull()?
【讨论】:
【参考方案2】:你用错了。
您不能在params
中使用xcom
。您需要在bql/sql
参数中使用它。你的 sql 文件,autoplay_calc.sql
可以包含类似
select * from XYZ where date == "xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip() "
【讨论】:
以上是关于在 BigQueryOperator 中拉取 xcom的主要内容,如果未能解决你的问题,请参考以下文章