批量从Dataframe插入到DB,忽略Pyspark中的失败行

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了批量从Dataframe插入到DB,忽略Pyspark中的失败行相关的知识,希望对你有一定的参考价值。

我试图使用JDBC写入向Postgres插入spark DF。 postgres表对其中一列有唯一约束,当要插入的df违反约束时,整个批次被拒绝,火花会话关闭,给出错误重复键值违反唯一约束,这是正确的,因为数据是重复的(已经存在)在数据库中)org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148

需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。

使用的代码是:

mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"} 
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)

我怎样才能做到这一点?

答案

不幸的是,Spark没有开箱即用的解决方案。我看到了许多可能的解决方案:

  1. 在PostgreSQL数据库中实现冲突解决的业务逻辑,作为forEachPartition函数的一部分。例如,捕获约束违例的异常,然后报告给日志。
  2. 删除PostgreSQL数据库上的约束,使用自动生成的PK意味着允许在数据库中存储重复的行。重复数据删除逻辑可以进一步实现为每个SQL查询的一部分或每天/每小时运行重复数据删除。你可以看到例子here
  3. 如果除了您的Spark作业之外没有其他系统或进程写入PostgreSQL表,则可以使用join操作进行过滤,以便在spark.write类似于this之前从Spark Dataframe中删除所有现有行

我希望我的想法会有所帮助。

另一答案

如果您对目标有唯一约束,那么这是不可能的。目前没有这些技术的UPSert模式。你需要围绕这个方面进行设计。

以上是关于批量从Dataframe插入到DB,忽略Pyspark中的失败行的主要内容,如果未能解决你的问题,请参考以下文章

从固定格式的文本文件批量插入忽略行终止符

如何使用 Apache Apex 将数据从 DB2 批量摄取到 Vertica

Android 批量插入或忽略 JSONArray

MS SQL 批量更新\插入

Sequelize - 从数组批量创建

mysql数据库批量高速插入