带有提交的 Airflow + pandas read_sql_query()

Posted

技术标签:

【中文标题】带有提交的 Airflow + pandas read_sql_query()【英文标题】:Airflow + pandas read_sql_query() with commit 【发布时间】:2019-05-14 02:49:18 【问题描述】:

问题

我可以使用 read_sql() 将 SQL 事务提交到数据库吗?

用例和背景

我有一个用例,我希望允许用户执行一些预定义的 SQL 并返回一个 pandas 数据框。在某些情况下,此 SQL 将需要查询预先填充的表,而在其他情况下,此 SQL 将执行一个写入表的函数,然后查询该表。 此逻辑当前包含在 Airflow DAG 中的方法内部,以便利用使用 PostgresHook 的 Airflow 可访问的数据库连接信息 - 该方法最终在 PythonOperator 任务中调用。我通过测试了解到 PostgresHook 创建了一个 psycopg2 连接对象。

代码

from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd 

def create_df(job_id,other_unrelated_inputs):
    conn = job_type_to_connection(job_type) # method that helps choose a database
    sql = open('/sql_files/job_id_.sql'.format(job_id)) #chooses arbitrary SQL  
    sql_template = sql.read() 
    hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow


    try:
        hook_conn_obj = hook.get_conn()
        print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
        # Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
        df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) 
    except:
        #catches some errors#
    return df

问题

目前,在执行 SQL 函数时,此代码会生成一个数据帧,但不会提交在 SQL 函数中所做的任何数据库更改。例如,更准确地说,如果 SQL 函数 INSERT 将一行插入到表中,则该事务不会提交,并且该行不会出现在表中。

尝试

我尝试了一些修复,但被卡住了。我最近的努力是更改 read_sql 用于自动提交事务的 psycopg2 连接的自动提交属性。

我承认我无法弄清楚连接的属性何时会影响 SQL 的执行。

我认识到另一种方法是复制PostgresHook.run() 中的一些逻辑以提交,然后添加一些代码以将结果推送到数据帧中,但是对于未来的支持来说,使用已经创建的方法似乎更加简洁和容易,如果可能的话。

我能找到的最类似的 SO 问题是 this one,但我对独立于 Airflow 的解决方案感兴趣。

编辑

...
    try:
        hook_conn_obj = hook.get_conn()
        print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
        hook_conn_obj.autocommit = True
        df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
    except:
        #catches some errors#
    return df

这似乎有效。如果有人对实现这一目标的更好方法有任何评论或想法,我仍然有兴趣从讨论中学习。

谢谢!

【问题讨论】:

【参考方案1】:

read_sql 不会提交,因为正如该方法名称所暗示的那样,目标是读取数据,而不是写入。 pandas 是不错的设计选择。这很重要,因为它可以防止意外写入并允许有趣的场景,例如运行过程、读取其效果但没有任何内容被持久化。 read_sql 的目的是阅读,而不是写作。直接表达意图是黄金标准。

表达您的意图的更明确的方式是在fetchall 之前明确地execute(带有提交)。但是因为pandas 没有提供从cursor 对象读取的简单方法,所以您将失去read_sql 提供的便捷性,并且必须自己创建DataFrame。

所以总而言之,您的解决方案很好,通过设置autocommit=True,您表明您的数据库交互将持续进行,因此应该不会发生意外。读起来有点奇怪,但是如果您将 sql_template 变量命名为 write_then_read_sql 之类的名称或在文档字符串中进行解释,则意图会更清晰。

【讨论】:

感谢您的上下文和建议!【参考方案2】:

我有一个类似的用例——使用 Pandas 将数据加载到 SQL Server 中,调用一个执行繁重工作并写入表的存储过程,然后将结果集捕获到一个新的 DataFrame 中。

我通过使用上下文管理器并显式提交事务来解决它:

# Connect to SQL Server
engine = sqlalchemy.create_engine('db_string')
with engine.connect() as connection:
    # Write dataframe to table with replace
    df.to_sql(name='myTable', con=connection, if_exists='replace')

    with connection.begin() as transaction:
        # Execute verification routine and capture results
        df_processed = pandas.read_sql(sql='exec sproc', con=connection)
        transaction.commit()

【讨论】:

以上是关于带有提交的 Airflow + pandas read_sql_query()的主要内容,如果未能解决你的问题,请参考以下文章

从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业

Airflow 中文文档:项目

使用来自 python 的 Airflow 触发 SQL 时出现模板错误?

原创大数据基础之Airflow生产环境部署airflow研究

Airflow 中文文档:设置配置选项

Airflow SSH 操作员错误:遇到 RSA 密钥,应为 OPENSSH 密钥