加入大型 Spark 数据帧
Posted
技术标签:
【中文标题】加入大型 Spark 数据帧【英文标题】:Joining Large Spark dataframes 【发布时间】:2021-01-29 01:52:27 【问题描述】:我有两个数据框
数据帧1:
|-- name: string (nullable = true)
|-- items: array (nullable = true)
| |-- element: string (containsNull = true)
数据帧2
|-- item: string (nullable = true)
|-- vector: array (nullable = true)
| |-- element: double (containsNull = true)
加入的数据帧
|-- name: string (nullable = true)
|-- combinedVector : array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: double (containsNull = true)
我希望有效地加入这两个数据框。数据集足够大(广播它们的大小为 100 GB)。 第一个数据帧中的项目最多可以有 1000 个值,第二个数据帧中的每个 Array[Double] 可以很大(压缩并保存为 parquet 时最多 5MB)。
有人可以帮我怎么做吗?
【问题讨论】:
您能澄清一下您的问题吗,您想根据什么条件加入他们?即使有什么问题,既然你在谈论尺寸和广播,你会提供更多关于你的火花集群道具的细节吗? 【参考方案1】:取决于您喜欢执行的操作和集群大小,但我建议进行优化,例如根据集群增加/减少分区大小/重新分区,只需选择所需的列而不是选择整个数据集等
【讨论】:
【参考方案2】:如果其中一个 DataFrame 非常小(正如您提到的 5MB 压缩作为镶木地板),您肯定可以加入它们并广播小型 DataFrame。通过这样做,小的 DataFrame 被传送到 Spark 集群中的每个 executor,从而在一定程度上提高了性能。
## Python example
result = df_1.join(broadcast(df_2), on="key", how="inner")
Spark 版本高于 3.0.0(包括)假设 df_2
是小型 DataFrame
-
您可以这样做,因为所有连接提示都受支持。
// Scala example
df_1.createOrReplaceTempView('df_1')
df_2.createOrReplaceTempView('df_2')
val result = spark.sql("""
SELECT /*+ BROADCAST(df_2) */ df_1.key, df_1.col2, df_2.col4
FROM df_1 JOIN df_2 ON df_1.key = df_2.key
""")
-
启用 AQE 功能AQE 提供的功能之一是优化查询的物理执行计划,例如将
SortMergeJoin
变为BroadcastHashJoin
。
spark.sql.adaptive.enabled=true
## Python example
df_1.createOrReplaceTempView('df_1')
df_2.createOrReplaceTempView('df_2')
result = spark.sql("""
SELECT df_1.key, df_1.col2, df_2.col4
FROM df_1 JOIN df_2 ON
df_1.key = df_2.key
""")
一些您可能会觉得有用的参考资料
-
Li, J. 2021。关于改进 Apache Spark SQL 中的广播连接。 [在线] 可在:https://www.youtube.com/watch?v=B9aY7KkTLTw&feature=emb_title。 [2021 年 1 月 29 日访问]。
docs.databricks.com。未注明日期自适应查询执行演示。 [在线] 可在:https://docs.databricks.com/_static/notebooks/aqe-demo.html。 [2021 年 1 月 29 日访问]。
【讨论】:
感谢您的回复。我会看看你提到的例子,看看有人为我工作。因此 DataFrame2 中的每个元素(列名 'vector' )可以达到 5MB。该数据框本身可以有 1M 行。大小为 100 os GB。所以在这种情况下广播加入并不好。我只是想看看这是否可以用 spark 本身来完成,或者需要像 Alluxio 这样的分布式缓存。以上是关于加入大型 Spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章
spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足
为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?