将rdd转换为数据框时,pyspark对mapPartitions使用一项任务

Posted

技术标签:

【中文标题】将rdd转换为数据框时,pyspark对mapPartitions使用一项任务【英文标题】:pyspark using one task for mapPartitions when converting rdd to dataframe 【发布时间】:2016-11-22 16:51:15 【问题描述】:

我很困惑为什么在将生成的 RDD 转换为 DataFrame 时,Spark 似乎为rdd.mapPartitions 使用了 1 个任务。

这对我来说是个问题,因为我想从:

DataFrame --> RDD --> rdd.mapPartitions --> DataFrame

这样我就可以读取数据(DataFrame),将非 SQL 函数应用于数据块(RDD 上的 mapPartitions),然后转换回 DataFrame,以便我可以使用DataFrame.write 进程。

我可以从 DataFrame --> mapPartitions 开始,然后使用像 saveAsTextFile 这样的 RDD 编写器,但这并不理想,因为 DataFrame.write 进程可以执行诸如覆盖和保存 Orc 格式的数据之类的操作。所以我想了解为什么会发生这种情况,但从实用的角度来看,我主要关心的是能够从 DataFrame --> mapParitions --> 到使用 DataFrame .write 进程。

这是一个可重现的示例。以下工作按预期工作,mapPartitions 工作有 100 个任务:

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession \
    .builder \
    .master("yarn-client") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

df = pd.DataFrame('var1':range(100000),'var2': [x-1000 for x in range(100000)])
spark_df = spark.createDataFrame(df).repartition(100)

def f(part):
    return [(1,2)]

spark_df.rdd.mapPartitions(f).collect()

但是,如果最后一行更改为 spark_df.rdd.mapPartitions(f).toDF().show() 之类的内容,那么 mapPartitions 工作将只有一项任务。

下面的一些截图说明了这一点:

【问题讨论】:

【参考方案1】:

DataFrame.show() 仅显示数据帧的前20 行,默认情况下仅显示前20 行。如果该数字小于每个分区的行数,则Spark 是惰性的并且仅评估单个分区,这相当于一个任务。

您还可以在数据帧上执行collect,以计算和收集所有分区并再次查看 100 个任务。

您仍然会像以前一样首先看到runJob 任务,这是由于toDF 调用能够确定结果数据帧的架构:它需要处理单个分区才能确定输出类型您的映射功能。在这个初始阶段之后,诸如collect 之类的实际操作将发生在所有分区上。例如,对我来说,运行您的 sn-p 并将最后一行替换为 spark_df.rdd.mapPartitions(f).toDF().collect() 会导致以下阶段:

【讨论】:

在结果上调用DataFrame.write时也会发生同样的情况。 您是否在等待您的工作完全完成?当我执行toDF().collect() 时,我也看到一个带有一个任务的runJob 阶段,由toDF 启动以检查结果数据帧的架构,然后是一个带有预期100 个任务的collect 阶段。 collect() 对我来说在现实生活中是不可行的,因为最终结果是几百 GB 的数据。作业在运行 DataFrame.write 时只有 1 个任务时失败,但在运行 saveAsText 时成功。我将编辑从收集和显示到保存数据的示例,因为它们之间可能存在差异。 呃,也许你是对的。我需要弄清楚为什么这在 1 任务步骤的真实数据上失败了。 我以collect 为例,另一个动作如write 也是如此,例如与toDF().write.csv('/tmp/test.csv') 我得到一个csv 第三阶段再次有100 个任务。看来您确实有其他问题,祝您好运!

以上是关于将rdd转换为数据框时,pyspark对mapPartitions使用一项任务的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 将 rdd 转换为具有空值的数据帧

将 numpy 数组的 rdd 转换为 pyspark 数据帧

使用 pyspark 将 RDD 行转换为数据帧时出错

如何在pyspark中将rdd行转换为带有json结构的数据框?

Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame

PySpark:将 RDD 转换为数据框中的列