使用 Psycopg2 将 Spark DataFrame 写入 Redshift 时出错:无法腌制 psycopg2.extensions.cursor 对象

Posted

技术标签:

【中文标题】使用 Psycopg2 将 Spark DataFrame 写入 Redshift 时出错:无法腌制 psycopg2.extensions.cursor 对象【英文标题】:Error writing Spark DataFrame to Redshift with Psycopg2: Can't pickle psycopg2.extensions.cursor objects 【发布时间】:2018-05-06 20:33:07 【问题描述】:

我可以通过psycopg2 连接到 Redshift:

import psycopg2
conn = psycopg2.connect(host=__credential__.host_redshift, 
                        dbname=__credential__.dbname_redshift,
                        user=__credential__.user_redshift, 
                        password=__credential__.password_redshift,
                        port=__credential__.port_redshift)
cur = conn.cursor()

另外,我可以更新数据库中的现有表:

cur.execute("""
    UPDATE tb
    SET col2='updated_target_row'
    WHERE col1='target_row';
""")
conn.commit()

现在,我想用 RowsSpark DataFrame 更新 Redshift 中的表。我抬头发现a pretty recent question about it(我想证明这一点,根本没有与另一个问题重复)。

解决方案似乎很简单。但是,我什至无法将 Row 对象传递给涉及光标的方法。

我现在正在尝试什么:

def update_info(row):
    cur.execute("""
        UPDATE tb
        SET col2='updated_target_row'
        WHERE col1='target_row';
    """)

df.rdd.foreach(update_info)
conn.commit()

我得到了错误:

TypeError: can't pickle psycopg2.extensions.cursor objects

有趣的是,这似乎不是一个常见问题。任何帮助表示赞赏。

附注:

    版本:

    python=3.6
    pyspark=2.2.0
    psycopg2=2.7.4
    

    完整的错误信息可以在pastebin找到。

    我尝试了rdd.map 而不是rdd.foreach,但没有成功。

【问题讨论】:

【参考方案1】:

连接对象和游标不可序列化,不能发送给工作人员。你应该使用foreachPartition:

def update_info(rows):
    conn = psycopg2.connect(...)
    cur = conn.cursor()

    for row in rows:
        cur.execute(...)

df.rdd.foreachPartition(update_info)

【讨论】:

非常感谢!你能提醒我如何将 psycopg2 导入每个分区吗?尝试您的解决方案(或方法中的import psycopg2)时出现“未找到模块”错误。我在所有工作节点上都安装了 psycopg2 pkg。 全局导入就足够了。您可以导入内部函数,但这不是必需的。如果您收到模块未找到错误,请仔细检查工作人员是否配置为使用期望环境/PYTHONPATH

以上是关于使用 Psycopg2 将 Spark DataFrame 写入 Redshift 时出错:无法腌制 psycopg2.extensions.cursor 对象的主要内容,如果未能解决你的问题,请参考以下文章

sparksql 自定义用户函数(UDF)

我们什么时候应该使用Spark-sql,什么时候应该使用Spark RDD

使用 Psycopg2 将数据从 S3 复制到 AWS Redshift 时出错

psycopg2“选择更新”

psycopg2/python 将数据从 postgresql 复制到 Amazon RedShift(postgresql)

无法使用 Postgres、Docker Compose 和 Psycopg2 将主机名“db”转换为地址