通过pyspark更新hive中的插入数据

Posted

技术标签:

【中文标题】通过pyspark更新hive中的插入数据【英文标题】:Update insert data in hive through pyspark 【发布时间】:2020-06-20 14:09:30 【问题描述】:

我的数据源源不断变化。我正在通过 sqoop 提取该数据,但由于容量很大,我无法将其保留为每日截断负载。我想追加数据,但逻辑应该是更新和插入。如果通过删除先前的相同记录在源中更新记录,则应在配置单元中执行相同操作,即应删除旧记录并插入/更新新记录。 下面是一个这样的例子。

30 分钟后,数据更新如下:

现在,我的 hive 表选择了原始记录,一段时间后选择了更新的记录,但将其插入为不同的行。

我希望在不覆盖我的表的情况下,反映的数据与源中的数据相同。 (推荐使用 Pyspark 代码)

请帮忙。谢谢。

【问题讨论】:

您只提取新的和更新的记录,您如何识别增量记录?更新记录的数量是多少? 我在源中有一个时间戳列,当记录更改/更新时会更新,并且通过 sqoop 增量逻辑我将它们拉到配置单元中,因为 sqoop 总是将最后一个增量值存储在元数据中。更新记录的数量几乎是每天 8-100 万。它包括更新的 + 新条目。 您可能想看看其他存储格式(HBase、Kudu),因为普通 HDFS 没有更新的概念。 【参考方案1】:

不提供查询,但提供一个想法: 在源和您的实际配置单元表之间创建一个临时表,该表将包含所有记录(插入和更新)。

要获取实际的 hive 表,请使用 rank 函数,例如:

rank() over (partition by id order by ingested_ts desc) as rnk
---
---
where rnk = 1

注意:根据您的数据量,hive_staging 表可能会增长,因此您需要相应地添加分区/桶。

【讨论】:

正是我现在正在做的事情,但问题在于大量数据。当我在排名后创建我的最终配置单元表时,总数据约为 70-80 cr。在蜂巢中加载这些数据需要将近 1-2 个小时,这是我不想要的。我希望这在 5-10 分钟内完成。就像 upsert 的工作原理一样。另外,我想每 1 小时安排一次。因此,如果数据加载本身需要 1 小时,我就无法进行排名。 您的集群有多大?另外,为什么不在 Hive 中使用酸属性,这正是这些用例的目的。 你能详细说明一下吗?【参考方案2】:

您可以在源中添加更多列作为last_modified,也可以在hive中添加更多列作为last_load,下次上传时,您可以在(源中的id列和hive中的id列)和(源的last_modified)上指定两个条件表和 hive 列的 last_update)。

【讨论】:

以上是关于通过pyspark更新hive中的插入数据的主要内容,如果未能解决你的问题,请参考以下文章

将值插入行类型的 Pyspark 中的 Hive 表

如何使用 pyspark 并行插入 Hive

从 excel 中读取数据并插入 HIVE

如何在 Spark 2.4.0 中使用 PySpark API 将表插入 Hive

PySpark 无法通过 sparkContext/hiveContext 读取 Hive ORC 事务表?我们可以使用 Pyspark 更新/删除配置单元表数据吗?

如何制作一个自动更新 Hive 的表