Spark 数据框 saveAsTable 正在使用单个任务

Posted

技术标签:

【中文标题】Spark 数据框 saveAsTable 正在使用单个任务【英文标题】:Spark dataframe saveAsTable is using a single task 【发布时间】:2016-01-12 00:40:11 【问题描述】:

我们有一个初始阶段可适当扩展的管道 - 每个使用几十个工作人员。

最后一个阶段是

dataFrame.write.format(outFormat).mode(saveMode).
partitionBy(partColVals.map(_._1): _*).saveAsTable(tname)

在这个阶段,我们最终得到了一个单个工人。这显然对我们不起作用——事实上,worker 的磁盘空间已经用完了——而且速度非常慢。

为什么该命令最终只能在单个工作人员/单个任务上运行?

更新输出格式为parquet。分区列数不影响结果(尝试一列和几列)。

另一个更新 没有以下条件(由下面的答案提出):

coalescepartitionBy 声明 window / 解析函数 Dataset.limit sql.shuffle.partitions

【问题讨论】:

类似的问题,你只问过***.com/questions/51050272/… 【参考方案1】:

这个问题不太可能与saveAsTable 有任何关系。

一个阶段中的单个任务表示输入数据(DatasetRDD)只有一个分区。这与存在多个任务但一个或多个任务的执行时间明显更长的情况形成对比,这通常对应于包含正倾斜键的分区。此外,您还应该混淆 CPU 利用率低的单个任务场景。前者通常是由于 IO 吞吐量不足(高 CPU 等待时间是最明显的迹象),但在极少数情况下可以追溯到使用低级同步原语的共享对象。

由于标准数据源不会在写入时对数据进行混洗(包括使用partitionBybucketBy 选项的情况),因此可以安全地假设数据已在上游代码的某处重新分区。通常这意味着发生了以下情况之一:

已使用coalesce(1)repartition(1) 将数据显式移至单个分区。

数据已被隐式移动到单个分区,例如:

Dataset.limit

窗口定义缺少PARTITION BY子句的窗口函数应用程序。

df.withColumn(
  "row_number", 
  row_number().over(Window.orderBy("some_column"))
)

sql.shuffle.partitions 选项设置为 1,上游代码包括对 Dataset 的非本地操作。

Dataset 是应用全局聚合函数的结果(没有 GROUP BY caluse)。这通常不是问题,除非函数是非缩减的(collect_list 或类似的)。

虽然没有证据表明这是这里的问题,但在一般情况下,您也应该有可能,数据只包含一个分区,一直到源。这通常是在获取输入using JDBC source 时,但第 3 方格式可以表现出相同的行为。

要确定问题的根源,您应该检查输入 Dataset (explain(true)) 的执行计划或检查 Spark Web UI 的 SQL 选项卡。

【讨论】:

请您扩展您的最后一个项目符号:GROUP_BYcollect_list?我遇到了一个问题,我添加了许多 collect_list 列,但没有执行 group_by,当我尝试写入磁盘时,它会导致 1-6 个任务永远运行。

以上是关于Spark 数据框 saveAsTable 正在使用单个任务的主要内容,如果未能解决你的问题,请参考以下文章

Spark 2.x saveAsTable

apache spark sql表覆盖问题

在 Spark 的 saveAsTable 上

带有 partitionBy 的 Spark DataFrame saveAsTable 在 HDFS 中不创建 ORC 文件

Spark saveAsTable append 将数据保存到 hive 但抛出错误:org.apache.hadoop.hive.ql.metadata.Hive.alterTable

如何从 pandas 数据框创建数据块表?