如何从查询中捕获值并将其用作另一个查询中的值
Posted
技术标签:
【中文标题】如何从查询中捕获值并将其用作另一个查询中的值【英文标题】:How to capture value from query and use it as a value in another query 【发布时间】:2019-12-19 16:34:07 【问题描述】:我有一个在 Python 中运行的 Postgres(redshift) 查询,它输出一个字段及其值。我想运行另一个使用该查询中的值的查询。但是,当我运行以下代码时,它给了我一个错误:
taskinstance.py:1051 ERROR - no results to fetch
这是我的代码:
def get_etl_recordd():
pg_hook = PostgresHook(postgre_conn_id="postgres_default", schema='db1')
connection = pg_hook.get_conn()
nt_cur = connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
atest_update_query = "select max(updated_at) from schema1.table1 group by updated_at order by updated_at asc limit 1;"
nt_cur.execute(atest_update_query)
result = nt_cur.fetchone()
max_updated_at = result.max
cursor2= connection.cursor()
latest_update_query1 = "select * from schema1.table1 where updated_at <= ''; commit;".format(max_updated_at)
cursor2.execute(latest_update_query1)
d=cursor2.fetchone()
connection.close()
知道为什么会这样吗? atest_updated_query
的值是一个时间戳,latest_updated_query1
实际上有一个输出。任何帮助将不胜感激。
这是我在 Python/Airflow 中运行的 Postgres(Redshift) 查询。
【问题讨论】:
请清理错别字(例如,atest_updated_query,
latest_upadted_query1,
get_etl_recordd()`)
【参考方案1】:
我无法在您的代码中发现问题,但您可以尝试使用PostgresHook
的get_first
方法。它应该完全符合您的目标,而且它肯定适用于我们的气流/红移设置。
sql = "SELECT 1"
hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
first_col_of_first_row = self.hook.get_first(sql)[0]
【讨论】:
那么,第 2 行和第 3 行应该在自定义运算符中定义吗?你是如何实现的?或者你只是在 Python Operator 任务中运行它? 如果它只是一个想法,那么我不会费心将它包装在一个运算符中。但如果您更频繁地使用它,请创建一个。以上是关于如何从查询中捕获值并将其用作另一个查询中的值的主要内容,如果未能解决你的问题,请参考以下文章
如何将文本文件中的值分配给python函数中的数组并将其用作全局?
如何将 API 重放的 JSON 中的值放到另一个 API 发布方法的主体中?