在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间

Posted

技术标签:

【中文标题】在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间【英文标题】:Efficient way of joining multiple tables in Spark - No space left on device 【发布时间】:2019-03-14 13:51:50 【问题描述】:

here 提出了类似的问题,但它没有正确解决我的问题。我有近 100 个 DataFrame,每个至少有 200,000 行,我需要加入它们,方法是基于列 ID 进行 full 连接,从而创建一个带有列的 DataFrame - ID, Col1, Col2,Col3,Col4, Col5..., Col102

只是为了说明,我的 DataFrames 的结构 -

df1 =                          df2 =            df3 =          .....  df100 = 
+----+------+------+------+    +----+------+    +----+------+         +----+------+ 
|  ID|  Col1|  Col2|  Col3|    |  ID|  Col4|    |  ID|  Col5|         |  ID|Col102|
+----+------+-------------+    +----+------+    +----+------+         +----+------+
| 501|  25.1|  34.9| 436.9|    | 501| 22.33|    | 503| 22.33|         | 501|  78,1|
| 502|  12.2|3225.9|  46.2|    | 502| 645.1|    | 505| 645.1|         | 502|  54.9|
| 504| 754.5| 131.0| 667.3|    | 504| 547.2|    | 504| 547.2|         | 507|     0|
| 505|324.12| 48.93|  -1.3|    | 506|     2|    | 506|     2|         | 509| 71.57|
| 506| 27.51| 88.99|  67.7|    | 507| 463.7|    | 507| 463.7|         | 510|  82.1|
.
.
+----+------+------|------|    |----|------|    |----|------|         |----|------|

我开始加入这些 DataFrame,方法是在所有数据帧上依次加入 full。自然,这是一个计算密集型过程,必须努力减少不同工作节点之间的shuffles 数量。因此,我首先使用repartition() 将基于ID 的DataFrame df1 划分为30 个分区-

df1 = df1.repartition(30,'ID')

现在,我在df1df2 之间进行full 连接。

df = df1.join(df2,['ID'],how='full')
df.persist()

由于df1 已经是hash-partitioned,所以我预计上面的join 会跳过洗牌并保持df1partitioner,但我注意到shuffle 确实发生了并且它将df 上的分区数量增加到200。现在,如果我通过如下所示的函数调用它们继续加入后续的 DataFrame,我会收到错误 java.io.IOException: No space left on device -

def rev(df,num):
     df_temp = spark.read.load(filename+str(num)+'.csv')
     df_temp.persist()
     df = df.join(df_temp,['ID'],how='full')
     df_temp.unpersist()
     return df

df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() - 
print("Total number of rows: "+str(df.count()))
df.unpersist()  # Never reached this stage.

更新:错误信息 -

Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)

问题: 1、为什么我们在做第一个join的时候没有维护df1的partitioner?

2.如何有效地加入这些多个表并避免No space left on device 问题?用户@silvio here 建议使用.bucketBy(),但他也提到了分区器将被维护的事实,但这并没有发生。因此,我不确定加入这些多个 DataFrame 的有效方法是什么。

任何建议/提示将不胜感激。

【问题讨论】:

尝试在每个加入的数据帧上使用coalesce()方法,以保持较少数量的分区,coalesce_repartition 不是原始问题的答案。但是只有 200,000 行,您可以在一秒钟内在 pandas 中完成此操作。 df = df1; df = df.set_index('ID'); df2 = df2.set_index('ID'); df['col4'] = df2['col4'], ... 等等。希望有人可以将其添加到 pyspark 中。 嗯,这只是一个例子......我们的想法是了解 Spark 如何在集群上进行分发以及如何有效地完成负载平衡。 【参考方案1】:

第一次尝试使用 for 循环(您可能已经有)在每 N 次迭代中保持您的大 df

第二次尝试通过设置sqlContext.sql("set spark.sql.shuffle.partitions=100")而不是默认的200来控制默认分区号。

您的代码应如下所示:

num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
   big_df = big_df.join(df, ....)

   if i % num_partitions == 0:
     big_df = big_df.persist()

在这里,我将每 10 次迭代称为持久化,您当然可以根据您的工作行为调整该数字。

编辑: 在您的情况下,您将本地 df_temp 保留在 rev 函数中,而不是包含所有先前连接的整个数据帧(在您的情况下为 df )。这对最终的执行计划没有影响,因为它是本地持久化的。至于我的建议,让我们假设您总共需要 100 个连接,然后使用上面的代码,您应该遍历循环 [1..100] 并每 10 次迭代保持累积的结果。在持久化大数据帧后,DAG 将包含更少的内存计算,因为中间步骤将被存储,并且 Spark 知道如何从存储中恢复它们,而不是从头开始重新计算所有内容。

【讨论】:

非常感谢您的回答。我刚刚更新了我的问题,以便准确反映我是如何解决这个问题的。我正在通过函数将df2 加载到df100,然后在函数本身内部的主数据帧上执行join 并返回它。我一直是 persisting 主数据框,但在主数据框上的 unpersist() 被调用之前,我得到了一个 error。我对每 10 个数据帧执行一次 persist 有点困惑。考虑到我在问题中所做的更新,您能否详细说明一下?非常感谢。 您好@cph_sto,第一个注意事项是您有一个关于(TID 8755, localhost, executor driver): java.io.IOException: No space left on device 的错误,告诉您驱动程序节点上没有剩余空间!第二,我更新了我的答案,详细说明了这种方法应该如何工作以及有什么好处。 嗨,对不起,我没有。我认为这样做需要管理权限,而我没有。我在 Jupyter+Spark 上,我读到 here 我需要在 SPARK_HOME/conf/spark_defaults.conf 中设置 SPARK_LOCAL_DIRS。让我尝试获得更多空间。之后我会尝试你的技术并通知你。非常感谢亚历山德罗斯的帮助:) @Alexandros :参考***.com/questions/55656759/… 有没有办法通过将排好序的列附加在一起来避免加入。我所有的表 df_x 都将具有相同的行数,ID 列。与其加入和损害性能,我们可以只排序,然后附加它们保持排序顺序吗? 您好@cph_sto 最终解决了您的问题吗?【参考方案2】:

我过去也遇到过类似的问题,只是没有那么多 RDD。我能找到的最有效的解决方案是使用低级 RDD API。首先存储所有 RDD,以便它们按连接列在分区内进行(散列)分区和排序:https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html#repartitionAndSortWithinPartitions-org.apache.spark.Partitioner-

在此之后,可以使用 zip 分区实现连接,而无需洗牌或使用大量内存:https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions-org.apache.spark.rdd.RDD-boolean-scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-

【讨论】:

实际上从未见过这样的用例。 感谢您的 cmets。我实际上已经使用ID 对我的所有数据框进行了repartition() 分区,然后加入它们。但是,当我加入他们时,分区数量增加到默认的 200。你提到的第一部分,我正在这样做,虽然我没有在分区内进行排序。顺便说一句,您是否也建议co-location 避免洗牌?如果是这样的话,那么在 Spark

以上是关于在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中加入数据集

在 Spark Scala 中加入后创建嵌套数据

使用 Spark SQL 在 cassandra 中加入两个表 - 错误:缺少 EOF

在 Spark 中加入数据框

如何在 spark scala 中加入 2 rdd

在 Apache Spark 中加入文件