如何将数据从 PySpark 持久化到 Hive - 避免重复

Posted

技术标签:

【中文标题】如何将数据从 PySpark 持久化到 Hive - 避免重复【英文标题】:How to persist data to Hive from PySpark - Avoiding duplicates 【发布时间】:2020-05-16 06:10:39 【问题描述】:

我正在使用graphframespysparkhive 处理图形数据。在处理数据时,我将构建一个图表,并最终将这些数据保存到一个 Hive 表中,我将不再更新它。

后续运行可能与之前运行的节点有关系,所以我要确保我不会重复数据。

例如,运行 #1 可能会找到节点:ABC。运行#2 可能会重新找到节点A,并且还会找到新的节点XYZ。我不希望A 在我的表格中出现两次。

我正在寻找处理此问题的最佳方法,并希望解决以下问题:

    在处理与其关联的元数据时,我需要跟踪节点的状态。在完成此处理后,我将希望将节点的数据持久保存到 Hive。 我想确保在遇到同一个节点时不会创建重复数据(例如,当我重新找到上面的A 节点时,我不想在 Hive 中插入另一行)

我目前正在修补最好的方法来做到这一点。我知道hive 现在支持 ACID 事务,但似乎pyspark 目前不支持 CRUD 类型的操作。所以这就是我的计划:

    每次运行时,创建一个dataframe 来存储我找到的节点。 找到新节点时:检查 Hive 中是否已存在该节点(例如 sqlContext.sql("SELECT * FROM existingTable WHERE name="<NAME>")。如果不存在,则将 dataframe 更新为 x = vertices.withColumn("name", F.when(F.col("id")=="a", "<THE-NEW-NAME>").otherwise(F.col("name"))) 以将其添加到我们的 Dataframe 中。 一旦所有节点都完成处理,创建一个临时视图:x.createOrReplaceTempView("myTmpView") 最后,使用sqlContext.sql("INSERT INTO TABLE existingTable SELECT * FROM myTmpView") 将我的临时视图中的数据插入到现有表中

我认为这会起作用,但它似乎非常hacky。我不确定这是否是由于我对 Hive/Spark 缺乏了解,或者这只是技术堆栈的性质。有一个更好的方法吗?以这种方式处理它是否有性能成本?

【问题讨论】:

【参考方案1】:

在 deltalake api 中,使用 scala 和 python 支持 upserts(Merge)。这正是您想要实现的。

https://docs.delta.io/latest/delta-update.html#merge-examples

这是另一种解决方案

    在您的表中有一个列 updated_time 时间戳 联合 prev_run_results 和 current_run_results 按“节点”分组,选择最新的时间戳 保存结果

【讨论】:

使用建议的替代解决方案,这不会在写回之前读取所有数据吗?如果数据非常大,这似乎是不可取的? 是的,这是一种高效的读取方式,如果您想高效写入,请重复写入,然后在读取时只获取最新的(使用第 3 步)。

以上是关于如何将数据从 PySpark 持久化到 Hive - 避免重复的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 将数据保存到 Hive 表时出错“未解析的运算符'InsertIntoTable HiveTableRelation'”

带有 hive 的 pyspark - 无法正确创建分区并从数据框中保存表

如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?

从 PySpark 查询 Hive 表时出错

将 hive 表卸载到。使用 Spark 或 pyspark 或 python 的 dat 文件

如何使用 pyspark 并行插入 Hive