Spark数据框加入问题

Posted

技术标签:

【中文标题】Spark数据框加入问题【英文标题】:Spark dataframe Join issue 【发布时间】:2019-03-10 22:46:36 【问题描述】:

下面的代码 sn-p 工作正常。(读取 CSV,读取 Parquet 并相互加入)

//Reading csv file -- getting three columns: Number of records: 1
 df1=spark.read.format("csv").load(filePath) 

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 30 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

奇怪的是下面的代码 sn-p 不起作用。 (读Hbase,读Parquet,互相join)(区别是从Hbase读)

//Reading from Hbase (It read from hbase properly -- getting three columns: Number of records: 1
 df1=read from Hbase code
 // It read from Hbase properly and able to show one record.
 df1.show

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

错误:原因:org.apache.spark.SparkException:作业因阶段故障而中止:56 个任务的序列化结果的总大小(1024.4 MB)大于 spark.driver.maxResultSize (1024.0 MB)

然后我添加了 spark.driver.maxResultSize=5g,然后另一个错误开始发生,Java Heap space error (run at ThreadPoolExecutor.java)。如果我在 Manager 中观察内存使用情况,我会发现使用量一直在上升,直到达到 ~ 50GB,此时会发生 OOM 错误。因此,无论出于何种原因,用于执行此操作的 RAM 量大约是我尝试使用的 RDD 大小的 10 倍。

如果我将 df1 保存在内存和磁盘中并执行 count()。程序运行良好。下面是代码sn-p

//Reading from Hbase -- getting three columns: Number of records: 1
 df1=read from Hbase code

**df1.persist(StorageLevel.MEMORY_AND_DISK)
val cnt = df1.count()**

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

它适用于文件,即使它具有相同的数据但不适用于 Hbase。在 100 个工作节点集群上运行,每个集群有 125 GB 内存。所以内存不是问题。

我的问题是文件和 Hbase 都具有相同的数据,并且都可以读取并能够显示()数据。但是为什么只有 Hbase 失败了。我很难理解这段代码可能出了什么问题。任何建议将不胜感激。

【问题讨论】:

【参考方案1】:

在提取数据时,spark 不知道从 HBase 检索到的行数,因此选择的策略是排序合并连接。

因此它会尝试对执行器中的数据进行排序和打乱。

为了避免这个问题,我们可以使用广播连接,同时我们不会使用 key 列对来自 df2 的数据进行排序和打乱,这显示了代码 sn-p 中的最后一条语句。

但是绕过这个(因为它只有一行),我们可以对要填充的列使用 Case 表达式。

示例:

df.withColumn(
"newCol"
,when(col("df2col1").eq(lit(hbaseKey))
    ,lit(hbaseValueCol1))
 .otherwise(lit(null))

【讨论】:

感谢您的回复。只是一个信息。 hbase 表将来可能会增长,但不会超过 100 行。我无法关注 hbaseKey。您能否再解释一下您提到的案例表达式。【参考方案2】:

我有时也会遇到这个错误。这通常发生在 spark 尝试在连接期间广播一个大表时(当 spark 的优化器低估表的大小或统计信息不正确时会发生这种情况)。由于没有提示强制排序合并连接 (How to hint for sort merge join or shuffled hash join (and skip broadcast hash join)?),唯一的选择是通过设置 spark.sql.autoBroadcastJoinThreshold= -1 来禁用广播连接

【讨论】:

【参考方案3】:

当我在加入过程中出现内存问题时,通常意味着以下两个原因之一:

    数据帧中的分区太少(分区太大) 在您加入的键上的两个数据帧中有许多重复项,这种连接会爆炸您的记忆。

广告 1. 我认为您应该在加入之前查看每个表中的分区数。当 Spark 读取一个文件时,它不一定会保留与原始表(parquet、csv 或其他)相同数量的分区。从 csv 读取与从 HBase 读取可能会创建不同数量的分区,这就是您看到性能差异的原因。太大的分区在加入后会变得更大,这会产生内存问题。查看 Spark UI 中每个任务的峰值执行内存。这将使您对每个任务的内存使用情况有所了解。我发现最好将其保持在 1 Gb 以下。

解决方案:在加入之前重新分区表。

广告。 2 也许不是这里的情况,但值得检查。

【讨论】:

为什么分区数在广播连接中很重要?这与驱动程序内存有什么关系?

以上是关于Spark数据框加入问题的主要内容,如果未能解决你的问题,请参考以下文章

Scala Spark 循环加入数据框

加入大型 Spark 数据帧

vue的确认弹框加格式

纯CSS实现tooltip提示框,CSS箭头及形状之续篇--给整个tooltip提示框加个边框

加入多个数据框火花

在 Spark 中合并数据框