Airflow:如何从 Postgres Operator 推送 xcom 价值?
Posted
技术标签:
【中文标题】Airflow:如何从 Postgres Operator 推送 xcom 价值?【英文标题】:Airflow: How to push xcom value from PostgreOperator? 【发布时间】:2017-08-11 08:55:13 【问题描述】:我正在使用 Airflow 1.8.1,我想从 PostgreOperator 推送 sql 请求的结果。
这是我的任务:
check_task = PostgresOperator(
task_id='check_task',
postgres_conn_id='conx',
sql="check_task.sql",
xcom_push=True,
dag=dag)
def py_is_first_execution(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_task')
print 'count ----> ', value
if value == 0:
return 'next_task'
else:
return 'end-flow'
check_branch = BranchPythonOperator(
task_id='is-first-execution',
python_callable=py_is_first_execution,
provide_context=True,
dag=dag)
这是我的 sql 脚本:
select count(1) from table
当我从 check_task
检查 xcom 值时,它检索到 none
值。
【问题讨论】:
【参考方案1】:如果我是正确的,当查询返回一个值时,airflow 会自动推送到 xcom。但是,当您查看postgresoperator 的代码时,您会发现它有一个调用 PostgresHook 的 run 方法(dbapi_hook 的扩展)的执行方法。这两种方法都不会返回任何东西,因此它不会向 xcom 推送任何内容。 我们为解决这个问题所做的是创建一个 CustomPostgresSelectOperator,它是 PostgresOperator 的副本,但不是 'hook.run(..)' 而是执行 'return hook.get_records(..)'。
希望对你有所帮助。
【讨论】:
它是否适用于气流集群中的不同进程?我试过这个:gist.github.com/fpopic/… 但如果 dag 中的下一个运算符使用 SelectPostgresOperator.execute 返回的结果,我总是会遇到一些异常,那么首选的方法是什么?我看到很多像 A_to_B 这样的运算符将所有逻辑(可能是输入/输出挂钩)放在一个运算符中,以避免在多个进程之间共享状态(可能很大)。【参考方案2】:最后,我在$AIRFLOW_HOME/plugins
下的插件管理器中创建了一个新的传感器ExecuteSqlOperator
。
我以CheckOperator
为例,修改了返回值:这个算子的基本运行和我需要的正好相反。
这是默认的ExecuteSqlOperator
:
CheckOperator
这是我自定义的SqlSensor
:ReverseSqlSensor
class SqlExecuteOperator(BaseOperator):
"""
Performs checks against a db. The ``CheckOperator`` expects
a sql query that will return a single row.
Note that this is an abstract class and get_db_hook
needs to be defined. Whereas a get_db_hook is hook that gets a
single record from an external source.
:param sql: the sql to be executed
:type sql: string
"""
template_fields = ('sql',)
template_ext = ('.hql', '.sql',)
ui_color = '#fff7e6'
@apply_defaults
def __init__(
self, sql,
conn_id=None,
*args, **kwargs):
super(SqlExecuteOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.sql = sql
def execute(self, context=None):
logging.info('Executing SQL statement: ' + self.sql)
records = self.get_db_hook().get_first(self.sql)
logging.info("Record: " + str(records))
records_int = int(records[0])
print (records_int)
return records_int
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
【讨论】:
以上是关于Airflow:如何从 Postgres Operator 推送 xcom 价值?的主要内容,如果未能解决你的问题,请参考以下文章
Airflow dag 中的 postgres_operator 问题
Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢