将rdd转换为数据框时,pyspark对mapPartitions使用一项任务
Posted
技术标签:
【中文标题】将rdd转换为数据框时,pyspark对mapPartitions使用一项任务【英文标题】:pyspark using one task for mapPartitions when converting rdd to dataframe 【发布时间】:2016-11-22 16:51:15 【问题描述】:我很困惑为什么在将生成的 RDD 转换为 DataFrame 时,Spark 似乎为rdd.mapPartitions
使用了 1 个任务。
这对我来说是个问题,因为我想从:
DataFrame
--> RDD
--> rdd.mapPartitions
--> DataFrame
这样我就可以读取数据(DataFrame),将非 SQL 函数应用于数据块(RDD 上的 mapPartitions),然后转换回 DataFrame,以便我可以使用DataFrame.write
进程。
我可以从 DataFrame --> mapPartitions 开始,然后使用像 saveAsTextFile 这样的 RDD 编写器,但这并不理想,因为 DataFrame.write
进程可以执行诸如覆盖和保存 Orc 格式的数据之类的操作。所以我想了解为什么会发生这种情况,但从实用的角度来看,我主要关心的是能够从 DataFrame --> mapParitions --> 到使用 DataFrame .write 进程。
这是一个可重现的示例。以下工作按预期工作,mapPartitions
工作有 100 个任务:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession \
.builder \
.master("yarn-client") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
df = pd.DataFrame('var1':range(100000),'var2': [x-1000 for x in range(100000)])
spark_df = spark.createDataFrame(df).repartition(100)
def f(part):
return [(1,2)]
spark_df.rdd.mapPartitions(f).collect()
但是,如果最后一行更改为 spark_df.rdd.mapPartitions(f).toDF().show()
之类的内容,那么 mapPartitions
工作将只有一项任务。
下面的一些截图说明了这一点:
【问题讨论】:
【参考方案1】:DataFrame.show()
仅显示数据帧的前20 行,默认情况下仅显示前20 行。如果该数字小于每个分区的行数,则Spark 是惰性的并且仅评估单个分区,这相当于一个任务。
您还可以在数据帧上执行collect
,以计算和收集所有分区并再次查看 100 个任务。
您仍然会像以前一样首先看到runJob
任务,这是由于toDF
调用能够确定结果数据帧的架构:它需要处理单个分区才能确定输出类型您的映射功能。在这个初始阶段之后,诸如collect
之类的实际操作将发生在所有分区上。例如,对我来说,运行您的 sn-p 并将最后一行替换为 spark_df.rdd.mapPartitions(f).toDF().collect()
会导致以下阶段:
【讨论】:
在结果上调用DataFrame.write
时也会发生同样的情况。
您是否在等待您的工作完全完成?当我执行toDF().collect()
时,我也看到一个带有一个任务的runJob
阶段,由toDF
启动以检查结果数据帧的架构,然后是一个带有预期100 个任务的collect
阶段。
collect()
对我来说在现实生活中是不可行的,因为最终结果是几百 GB 的数据。作业在运行 DataFrame.write
时只有 1 个任务时失败,但在运行 saveAsText
时成功。我将编辑从收集和显示到保存数据的示例,因为它们之间可能存在差异。
呃,也许你是对的。我需要弄清楚为什么这在 1 任务步骤的真实数据上失败了。
我以collect
为例,另一个动作如write
也是如此,例如与toDF().write.csv('/tmp/test.csv')
我得到一个csv
第三阶段再次有100 个任务。看来您确实有其他问题,祝您好运!以上是关于将rdd转换为数据框时,pyspark对mapPartitions使用一项任务的主要内容,如果未能解决你的问题,请参考以下文章
将 numpy 数组的 rdd 转换为 pyspark 数据帧
如何在pyspark中将rdd行转换为带有json结构的数据框?