使用 Psycopg2 将 Spark DataFrame 写入 Redshift 时出错:无法腌制 psycopg2.extensions.cursor 对象
Posted
技术标签:
【中文标题】使用 Psycopg2 将 Spark DataFrame 写入 Redshift 时出错:无法腌制 psycopg2.extensions.cursor 对象【英文标题】:Error writing Spark DataFrame to Redshift with Psycopg2: Can't pickle psycopg2.extensions.cursor objects 【发布时间】:2018-05-06 20:33:07 【问题描述】:我可以通过psycopg2
连接到 Redshift:
import psycopg2
conn = psycopg2.connect(host=__credential__.host_redshift,
dbname=__credential__.dbname_redshift,
user=__credential__.user_redshift,
password=__credential__.password_redshift,
port=__credential__.port_redshift)
cur = conn.cursor()
另外,我可以更新数据库中的现有表:
cur.execute("""
UPDATE tb
SET col2='updated_target_row'
WHERE col1='target_row';
""")
conn.commit()
现在,我想用 Rows
从 Spark DataFrame
更新 Redshift 中的表。我抬头发现a pretty recent question about it(我想证明这一点,根本没有与另一个问题重复)。
解决方案似乎很简单。但是,我什至无法将 Row
对象传递给涉及光标的方法。
我现在正在尝试什么:
def update_info(row):
cur.execute("""
UPDATE tb
SET col2='updated_target_row'
WHERE col1='target_row';
""")
df.rdd.foreach(update_info)
conn.commit()
我得到了错误:
TypeError: can't pickle psycopg2.extensions.cursor objects
有趣的是,这似乎不是一个常见问题。任何帮助表示赞赏。
附注:
版本:
python=3.6
pyspark=2.2.0
psycopg2=2.7.4
完整的错误信息可以在pastebin找到。
我尝试了rdd.map
而不是rdd.foreach
,但没有成功。
【问题讨论】:
【参考方案1】:连接对象和游标不可序列化,不能发送给工作人员。你应该使用foreachPartition
:
def update_info(rows):
conn = psycopg2.connect(...)
cur = conn.cursor()
for row in rows:
cur.execute(...)
df.rdd.foreachPartition(update_info)
【讨论】:
非常感谢!你能提醒我如何将 psycopg2 导入每个分区吗?尝试您的解决方案(或方法中的import psycopg2
)时出现“未找到模块”错误。我在所有工作节点上都安装了 psycopg2 pkg。
全局导入就足够了。您可以导入内部函数,但这不是必需的。如果您收到模块未找到错误,请仔细检查工作人员是否配置为使用期望环境/PYTHONPATH
。以上是关于使用 Psycopg2 将 Spark DataFrame 写入 Redshift 时出错:无法腌制 psycopg2.extensions.cursor 对象的主要内容,如果未能解决你的问题,请参考以下文章
我们什么时候应该使用Spark-sql,什么时候应该使用Spark RDD
使用 Psycopg2 将数据从 S3 复制到 AWS Redshift 时出错
psycopg2/python 将数据从 postgresql 复制到 Amazon RedShift(postgresql)