在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间
Posted
技术标签:
【中文标题】在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间【英文标题】:Efficient way of joining multiple tables in Spark - No space left on device 【发布时间】:2019-03-14 13:51:50 【问题描述】:here 提出了类似的问题,但它没有正确解决我的问题。我有近 100 个 DataFrame,每个至少有 200,000
行,我需要加入它们,方法是基于列 ID
进行 full
连接,从而创建一个带有列的 DataFrame - ID, Col1, Col2,Col3,Col4, Col5..., Col102
。
只是为了说明,我的 DataFrames 的结构 -
df1 = df2 = df3 = ..... df100 =
+----+------+------+------+ +----+------+ +----+------+ +----+------+
| ID| Col1| Col2| Col3| | ID| Col4| | ID| Col5| | ID|Col102|
+----+------+-------------+ +----+------+ +----+------+ +----+------+
| 501| 25.1| 34.9| 436.9| | 501| 22.33| | 503| 22.33| | 501| 78,1|
| 502| 12.2|3225.9| 46.2| | 502| 645.1| | 505| 645.1| | 502| 54.9|
| 504| 754.5| 131.0| 667.3| | 504| 547.2| | 504| 547.2| | 507| 0|
| 505|324.12| 48.93| -1.3| | 506| 2| | 506| 2| | 509| 71.57|
| 506| 27.51| 88.99| 67.7| | 507| 463.7| | 507| 463.7| | 510| 82.1|
.
.
+----+------+------|------| |----|------| |----|------| |----|------|
我开始加入这些 DataFrame,方法是在所有数据帧上依次加入 full
。自然,这是一个计算密集型过程,必须努力减少不同工作节点之间的shuffles
数量。因此,我首先使用repartition() 将基于ID
的DataFrame df1
划分为30 个分区-
df1 = df1.repartition(30,'ID')
现在,我在df1
和df2
之间进行full
连接。
df = df1.join(df2,['ID'],how='full')
df.persist()
由于df1
已经是hash-partitioned
,所以我预计上面的join
会跳过洗牌并保持df1
的partitioner
,但我注意到shuffle
确实发生了并且它将df
上的分区数量增加到200
。现在,如果我通过如下所示的函数调用它们继续加入后续的 DataFrame,我会收到错误 java.io.IOException: No space left on device
-
def rev(df,num):
df_temp = spark.read.load(filename+str(num)+'.csv')
df_temp.persist()
df = df.join(df_temp,['ID'],how='full')
df_temp.unpersist()
return df
df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() -
print("Total number of rows: "+str(df.count()))
df.unpersist() # Never reached this stage.
更新:错误信息 -
Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
问题: 1、为什么我们在做第一个join
的时候没有维护df1
的partitioner?
2.如何有效地加入这些多个表并避免No space left on device
问题?用户@silvio here 建议使用.bucketBy(),但他也提到了分区器将被维护的事实,但这并没有发生。因此,我不确定加入这些多个 DataFrame 的有效方法是什么。
任何建议/提示将不胜感激。
【问题讨论】:
尝试在每个加入的数据帧上使用coalesce()
方法,以保持较少数量的分区,coalesce_repartition
不是原始问题的答案。但是只有 200,000 行,您可以在一秒钟内在 pandas 中完成此操作。 df = df1; df = df.set_index('ID'); df2 = df2.set_index('ID'); df['col4'] = df2['col4']
, ... 等等。希望有人可以将其添加到 pyspark 中。
嗯,这只是一个例子......我们的想法是了解 Spark 如何在集群上进行分发以及如何有效地完成负载平衡。
【参考方案1】:
第一次尝试使用 for 循环(您可能已经有)在每 N 次迭代中保持您的大 df
第二次尝试通过设置sqlContext.sql("set spark.sql.shuffle.partitions=100")
而不是默认的200来控制默认分区号。
您的代码应如下所示:
num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
big_df = big_df.join(df, ....)
if i % num_partitions == 0:
big_df = big_df.persist()
在这里,我将每 10 次迭代称为持久化,您当然可以根据您的工作行为调整该数字。
编辑: 在您的情况下,您将本地 df_temp 保留在 rev 函数中,而不是包含所有先前连接的整个数据帧(在您的情况下为 df )。这对最终的执行计划没有影响,因为它是本地持久化的。至于我的建议,让我们假设您总共需要 100 个连接,然后使用上面的代码,您应该遍历循环 [1..100] 并每 10 次迭代保持累积的结果。在持久化大数据帧后,DAG 将包含更少的内存计算,因为中间步骤将被存储,并且 Spark 知道如何从存储中恢复它们,而不是从头开始重新计算所有内容。
【讨论】:
非常感谢您的回答。我刚刚更新了我的问题,以便准确反映我是如何解决这个问题的。我正在通过函数将df2
加载到df100
,然后在函数本身内部的主数据帧上执行join
并返回它。我一直是 persisting
主数据框,但在主数据框上的 unpersist()
被调用之前,我得到了一个 error
。我对每 10 个数据帧执行一次 persist
有点困惑。考虑到我在问题中所做的更新,您能否详细说明一下?非常感谢。
您好@cph_sto,第一个注意事项是您有一个关于(TID 8755, localhost, executor driver): java.io.IOException: No space left on device
的错误,告诉您驱动程序节点上没有剩余空间!第二,我更新了我的答案,详细说明了这种方法应该如何工作以及有什么好处。
嗨,对不起,我没有。我认为这样做需要管理权限,而我没有。我在 Jupyter+Spark 上,我读到 here 我需要在 SPARK_HOME/conf/spark_defaults.conf 中设置 SPARK_LOCAL_DIRS。让我尝试获得更多空间。之后我会尝试你的技术并通知你。非常感谢亚历山德罗斯的帮助:)
@Alexandros :参考***.com/questions/55656759/… 有没有办法通过将排好序的列附加在一起来避免加入。我所有的表 df_x 都将具有相同的行数,ID 列。与其加入和损害性能,我们可以只排序,然后附加它们保持排序顺序吗?
您好@cph_sto 最终解决了您的问题吗?【参考方案2】:
我过去也遇到过类似的问题,只是没有那么多 RDD。我能找到的最有效的解决方案是使用低级 RDD API。首先存储所有 RDD,以便它们按连接列在分区内进行(散列)分区和排序:https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html#repartitionAndSortWithinPartitions-org.apache.spark.Partitioner-
在此之后,可以使用 zip 分区实现连接,而无需洗牌或使用大量内存:https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions-org.apache.spark.rdd.RDD-boolean-scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-
【讨论】:
实际上从未见过这样的用例。 感谢您的 cmets。我实际上已经使用ID
对我的所有数据框进行了repartition()
分区,然后加入它们。但是,当我加入他们时,分区数量增加到默认的 200。你提到的第一部分,我正在这样做,虽然我没有在分区内进行排序。顺便说一句,您是否也建议co-location
避免洗牌?如果是这样的话,那么在 Spark
以上是关于在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间的主要内容,如果未能解决你的问题,请参考以下文章