Spark增量加载覆盖旧记录

Posted

技术标签:

【中文标题】Spark增量加载覆盖旧记录【英文标题】:Spark incremental loading overwrite old record 【发布时间】:2019-05-04 08:36:40 【问题描述】:

我需要使用 Spark (PySpark) 对表进行增量加载

示例如下:

第 1 天

id | value
-----------
1  | abc
2  | def

第 2 天

id | value
-----------
2  | cde
3  | xyz

预期结果

id | value
-----------
1  | abc
2  | cde
3  | xyz

这可以在关系数据库中轻松完成, 想知道这是否可以在 Spark 或其他转换工具中完成,例如急转直下?

【问题讨论】:

只需union 两个数据框并删除重复项+排序,您将获得所需的输出。 【参考方案1】:

数据帧附加由 pyspark 中的union 函数完成。我将通过一个示例进行演示,并按照您在问题中提到的那样创建 2 个数据框。

from pyspark.sql.types import Row
df1 = sqlContext.createDataFrame([Row(id=1,value="abc"),Row(id=2,value="def")])

df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  def|
+---+-----+

df2 = sqlContext.createDataFrame([Row(id=2,value="cde"),Row(id=3,value="xyz")])
df2.show()
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+

让我们在两个数据帧之间做一个union,你会得到想要的结果。

df2.union(df1).dropDuplicates(["id"]).show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+

您可以使用来自pyspark.sql.functionsasc 对输出进行排序

from pyspark.sql.functions import asc


df2.union(df1).dropDuplicates(["id"]).sort(asc("id")).show()
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  2|  cde|
|  3|  xyz|
+---+-----+

【讨论】:

我认为您在 dropDuplicates 调用之前需要 sort(asc("id")) 。此外,dropDuplicates 调用现在需要传递命名的可选数组参数subset,而不是字符串。【参考方案2】:

解决方法,在数据框中添加一个日期列,然后根据 id 排名并按日期降序排列,然后排名 == 1。它将始终根据 id 为您提供最新记录。

df.("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
  .filter($"rank" === 1)
  .drop($"rank")
  .orderBy($"id")
  .show

【讨论】:

【参考方案3】:

给你! 第一个数据框:

 >>> list1 = [(1, 'abc'),(2,'def')]
 >>> olddf = spark.createDataFrame(list1, ['id', 'value'])
 >>> olddf.show();
 +---+-----+
 | id|value|
 +---+-----+
 |  1|  abc|
 |  2|  def|
 +---+-----+

第二个数据框:

>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+

现在使用完全外连接连接和合并这两个数据名称,并在选择时使用合并功能,并且可以用用户定义的值替换空值。

from pyspark.sql.functions import *

>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+

我希望这能解决您的问题。 :-)

【讨论】:

感谢您的分步指导,想知道是否可以通过 dropDuplicates(["id"]) 更轻松地完成此操作?

以上是关于Spark增量加载覆盖旧记录的主要内容,如果未能解决你的问题,请参考以下文章

从 blob 存储到 azure 表存储的增量负载

“增量负载”是啥意思?

gitlab增量代码覆盖率

在数据块中加载增量表特定分区的最佳实践是啥?

对 Google 数据存储的增量备份

jacoco增量代码覆盖率