从数据框批量插入到数据库,忽略 Pyspark 中的失败行

Posted

技术标签:

【中文标题】从数据框批量插入到数据库,忽略 Pyspark 中的失败行【英文标题】:Batch Insert from Dataframe to DB ignoring failed row in Pyspark 【发布时间】:2018-07-31 11:38:26 【问题描述】:

我正在尝试使用 JDBC 写入将 spark DF 插入 Postgres。 postgres 表对其中一列有唯一约束,当要插入的 df 违反约束时,整个批次被拒绝并且 spark 会话关闭,给出错误 duplicate key value 违反唯一约束,这是正确的数据重复(已存在于数据库中) org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148

需要插入不违反约束的数据行并忽略失败的行,而不会使整个批处理失败。

使用的代码是:

mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = "user": "username", "password": "password" 
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)

我该怎么做?

【问题讨论】:

如果不将自定义写入数据库作为 forEachPartition 的一部分,则无法跳过失败的批次。如果可以更改表约束,最好删除约束,然后将重复数据删除逻辑作为 SQL 查询的一部分运行。 @DavidGreenshtein 您能否说明您将在何处运行重复数据删除?谢谢。我有自己的看法,但对你的很感兴趣。 @David Greenshtein:感谢您的建议。在使用 forEachPartition 时出现错误:行类型不可迭代。虽然我可以找到一些使用 scala 的示例,但似乎没有 pyspark 等效代码。 【参考方案1】:

很遗憾,Spark 没有开箱即用的解决方案。我看到了许多可能的解决方案:

    在 PostgreSQL 数据库中实现冲突解决的业务逻辑,作为 forEachPartition 函数的一部分。例如,捕获约束违反的异常,然后报告到日志中。

    删除 PostgreSQL 数据库上的约束,使用自动生成的 PK 表示启用在数据库中存储重复的行。重复数据删除逻辑可以进一步实现为每个 SQL 查询的一部分,或者每天/每小时运行重复数据删除。您可以查看示例here。

    1234563

我希望我的想法会有所帮助。

【讨论】:

那么forEachPartiton 逻辑是做什么的呢?无法从您的第二点进行测量。 @David Greenshtein 如果 PostgreSQL 中定义的约束字段没有很大的基数,想法是根据在 forEachPartition 之前在 PostgreSQL 中定义的约束重新分区数据 -> 准备一个包含相同行的批量约束值 -> 批量写入数据库 -> 如果失败日志并继续下一个批量 我想我需要看看逻辑。我想我明白了,但不是我想我会想出的方法。很有趣。【参考方案2】:

如果您对目标有唯一的约束,这是不可能的。目前没有使用这些技术的 UPSert 模式。您需要围绕这方面进行设计。

【讨论】:

感谢您的帮助,但基本上我不是在寻找像 Upsert 这样的解决方案,如果记录重复,则无需更新记录。寻找类似 SSIS 所做的事情,将失败的行标记为错误并在批处理中插入所有其他行 我明白了,但它不会飞,除非你像其他人所说的那样做,这与你原来的方法完全不同。所以我看到你认为你应该在写之前检查是否存在。有趣,热衷于记录您的最终解决方案 如果目标很大怎么办? 我不会检查数据行是否存在,因为这将成为性能瓶颈。但仍在寻找平衡的解决方案..希望尽快找到一个 我的观点完全一致。所以我不确定大卫的提议是什么。我的赌注是没有唯一的约束,并且会定期对目标进行重复数据删除。请让我知道你是如何解决的。

以上是关于从数据框批量插入到数据库,忽略 Pyspark 中的失败行的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据框中的连接值插入到 Pyspark 中的另一个数据框中?

从pyspark到cosmosdb插入多行

如何从 R 在 MongoDB 中批量插入文档?

pySpark - 在插入数据库之前将整个数据框列转换为 JSON 对象

Android 批量插入或忽略 JSONArray

Pyspark 忽略 pyspark-sql-functions 中数据帧的过滤