如何将数据从 PySpark 持久化到 Hive - 避免重复
Posted
技术标签:
【中文标题】如何将数据从 PySpark 持久化到 Hive - 避免重复【英文标题】:How to persist data to Hive from PySpark - Avoiding duplicates 【发布时间】:2020-05-16 06:10:39 【问题描述】:我正在使用graphframes
、pyspark
和hive
处理图形数据。在处理数据时,我将构建一个图表,并最终将这些数据保存到一个 Hive 表中,我将不再更新它。
后续运行可能与之前运行的节点有关系,所以我要确保我不会重复数据。
例如,运行 #1 可能会找到节点:A
、B
、C
。运行#2 可能会重新找到节点A
,并且还会找到新的节点X
、Y
、Z
。我不希望A
在我的表格中出现两次。
我正在寻找处理此问题的最佳方法,并希望解决以下问题:
-
在处理与其关联的元数据时,我需要跟踪节点的状态。在完成此处理后,我将仅希望将节点的数据持久保存到 Hive。
我想确保在遇到同一个节点时不会创建重复数据(例如,当我重新找到上面的
A
节点时,我不想在 Hive 中插入另一行)
我目前正在修补最好的方法来做到这一点。我知道hive
现在支持 ACID 事务,但似乎pyspark
目前不支持 CRUD 类型的操作。所以这就是我的计划:
-
每次运行时,创建一个
dataframe
来存储我找到的节点。
找到新节点时:检查 Hive 中是否已存在该节点(例如 sqlContext.sql("SELECT * FROM existingTable WHERE name="<NAME>")
。如果不存在,则将 dataframe
更新为 x = vertices.withColumn("name", F.when(F.col("id")=="a", "<THE-NEW-NAME>").otherwise(F.col("name")))
以将其添加到我们的 Dataframe 中。
一旦所有节点都完成处理,创建一个临时视图:x.createOrReplaceTempView("myTmpView")
最后,使用sqlContext.sql("INSERT INTO TABLE existingTable SELECT * FROM myTmpView")
将我的临时视图中的数据插入到现有表中
我认为这会起作用,但它似乎非常hacky。我不确定这是否是由于我对 Hive/Spark 缺乏了解,或者这只是技术堆栈的性质。有一个更好的方法吗?以这种方式处理它是否有性能成本?
【问题讨论】:
【参考方案1】:在 deltalake api 中,使用 scala 和 python 支持 upserts(Merge)。这正是您想要实现的。
https://docs.delta.io/latest/delta-update.html#merge-examples
这是另一种解决方案
-
在您的表中有一个列 updated_time 时间戳
联合 prev_run_results 和 current_run_results
按“节点”分组,选择最新的时间戳
保存结果
【讨论】:
使用建议的替代解决方案,这不会在写回之前读取所有数据吗?如果数据非常大,这似乎是不可取的? 是的,这是一种高效的读取方式,如果您想高效写入,请重复写入,然后在读取时只获取最新的(使用第 3 步)。以上是关于如何将数据从 PySpark 持久化到 Hive - 避免重复的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark - 将数据保存到 Hive 表时出错“未解析的运算符'InsertIntoTable HiveTableRelation'”
带有 hive 的 pyspark - 无法正确创建分区并从数据框中保存表
如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?