加入大型 Spark 数据帧

Posted

技术标签:

【中文标题】加入大型 Spark 数据帧【英文标题】:Joining Large Spark dataframes 【发布时间】:2021-01-29 01:52:27 【问题描述】:

我有两个数据框

数据帧1:

 |-- name: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = true)

数据帧2

 |-- item: string (nullable = true)
 |-- vector: array (nullable = true)
 |    |-- element: double (containsNull = true)

加入的数据帧

 |-- name: string (nullable = true)
 |-- combinedVector : array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)

我希望有效地加入这两个数据框。数据集足够大(广播它们的大小为 100 GB)。 第一个数据帧中的项目最多可以有 1000 个值,第二个数据帧中的每个 Array[Double] 可以很大(压缩并保存为 parquet 时最多 5MB)。

有人可以帮我怎么做吗?

【问题讨论】:

您能澄清一下您的问题吗,您想根据什么条件加入他们?即使有什么问题,既然你在谈论尺寸和广播,你会提供更多关于你的火花集群道具的细节吗? 【参考方案1】:

取决于您喜欢执行的操作和集群大小,但我建议进行优化,例如根据集群增加/减少分区大小/重新分区,只需选择所需的列而不是选择整个数据集等

【讨论】:

【参考方案2】:

如果其中一个 DataFrame 非常小(正如您提到的 5MB 压缩作为镶木地板),您肯定可以加入它们并广播小型 DataFrame。通过这样做,小的 DataFrame 被传送到 Spark 集群中的每个 executor,从而在一定程度上提高了性能。

## Python example
result = df_1.join(broadcast(df_2), on="key", how="inner")

Spark 版本高于 3.0.0(包括)假设 df_2 是小型 DataFrame

    您可以这样做,因为所有连接提示都受支持。
// Scala example
df_1.createOrReplaceTempView('df_1')
df_2.createOrReplaceTempView('df_2')
val result = spark.sql("""
    SELECT /*+ BROADCAST(df_2) */ df_1.key, df_1.col2, df_2.col4
    FROM df_1 JOIN df_2 ON df_1.key = df_2.key
""")
    启用 AQE 功能AQE 提供的功能之一是优化查询的物理执行计划,例如将SortMergeJoin 变为BroadcastHashJoin
spark.sql.adaptive.enabled=true
## Python example
df_1.createOrReplaceTempView('df_1')
df_2.createOrReplaceTempView('df_2')
result = spark.sql("""
    SELECT df_1.key, df_1.col2, df_2.col4
    FROM df_1 JOIN df_2 ON 
       df_1.key = df_2.key
""")

一些您可能会觉得有用的参考资料

    Li, J. 2021。关于改进 Apache Spark SQL 中的广播连接。 [在线] 可在:https://www.youtube.com/watch?v=B9aY7KkTLTw&feature=emb_title。 [2021 年 1 月 29 日访问]。 docs.databricks.com。未注明日期自适应查询执行演示。 [在线] 可在:https://docs.databricks.com/_static/notebooks/aqe-demo.html。 [2021 年 1 月 29 日访问]。

【讨论】:

感谢您的回复。我会看看你提到的例子,看看有人为我工作。因此 DataFrame2 中的每个元素(列名 'vector' )可以达到 5MB。该数据框本身可以有 1M 行。大小为 100 os GB。所以在这种情况下广播加入并不好。我只是想看看这是否可以用 spark 本身来完成,或者需要像 Alluxio 这样的分布式缓存。

以上是关于加入大型 Spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

将大型 Spark 数据帧从数据块写入 csv 失败

将大型 Spark 数据帧作为镶木地板写入 s3 存储桶

spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足

为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

如何在 python 中计算大型 spark 数据框的 kendall tau?

您可以使用 SparkR 进行广播加入吗?