批量从Dataframe插入到DB,忽略Pyspark中的失败行
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了批量从Dataframe插入到DB,忽略Pyspark中的失败行相关的知识,希望对你有一定的参考价值。
我试图使用JDBC写入向Postgres插入spark DF。 postgres表对其中一列有唯一约束,当要插入的df违反约束时,整个批次被拒绝,火花会话关闭,给出错误重复键值违反唯一约束,这是正确的,因为数据是重复的(已经存在)在数据库中)org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148
需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。
使用的代码是:
mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"}
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)
我怎样才能做到这一点?
答案
不幸的是,Spark没有开箱即用的解决方案。我看到了许多可能的解决方案:
- 在PostgreSQL数据库中实现冲突解决的业务逻辑,作为forEachPartition函数的一部分。例如,捕获约束违例的异常,然后报告给日志。
- 删除PostgreSQL数据库上的约束,使用自动生成的PK意味着允许在数据库中存储重复的行。重复数据删除逻辑可以进一步实现为每个SQL查询的一部分或每天/每小时运行重复数据删除。你可以看到例子here。
- 如果除了您的Spark作业之外没有其他系统或进程写入PostgreSQL表,则可以使用join操作进行过滤,以便在spark.write类似于this之前从Spark Dataframe中删除所有现有行
我希望我的想法会有所帮助。
另一答案
如果您对目标有唯一约束,那么这是不可能的。目前没有这些技术的UPSert模式。你需要围绕这个方面进行设计。
以上是关于批量从Dataframe插入到DB,忽略Pyspark中的失败行的主要内容,如果未能解决你的问题,请参考以下文章