Spark DataFrame 方法 `toPandas` 实际上在做啥?

Posted

技术标签:

【中文标题】Spark DataFrame 方法 `toPandas` 实际上在做啥?【英文标题】:What is the Spark DataFrame method `toPandas` actually doing?Spark DataFrame 方法 `toPandas` 实际上在做什么? 【发布时间】:2015-03-24 06:22:11 【问题描述】:

我是 Spark-DataFrame API 的初学者。

我使用此代码将制表符分隔的 csv 加载到 Spark Dataframe 中

lines = sc.textFile('tail5.csv')
parts = lines.map(lambda l : l.strip().split('\t'))
fnames = *some name list*
schemaData = StructType([StructField(fname, StringType(), True) for fname in fnames])
ddf = sqlContext.createDataFrame(parts,schemaData)

假设我使用 Spark 从新文件创建 DataFrame,并使用内置方法 toPandas() 将其转换为 pandas,

它是否将 Pandas 对象存储到本地内存中? Pandas 低级计算是否全部由 Spark 处理? 它是否公开了所有 pandas 数据框功能?(我猜是的) 我可以将它转换为 Pandas 并完成它,而无需太多接触 DataFrame API 吗?

【问题讨论】:

【参考方案1】:

使用 spark 将 CSV 文件读入 pandas 是一种相当迂回的方法,以实现将 CSV 文件读入内存的最终目标。

您似乎误解了此处所使用技术的用例。

Spark 用于分布式计算(尽管它可以在本地使用)。它通常过于繁重,无法用于简单地读取 CSV 文件。

在您的示例中,sc.textFile 方法只会为您提供一个 spark RDD,它实际上是一个文本行列表。这可能不是你想要的。不会执行类型推断,因此如果您想对 CSV 文件中的一列数字求和,您将无法这样做,因为就 Spark 而言,它们仍然是字符串。

只需使用 pandas.read_csv 并将整个 CSV 读入内存即可。 Pandas 会自动推断每列的类型。 Spark 不这样做。

现在回答你的问题:

是否将 Pandas 对象存储到本地内存中

是的。 toPandas() 会将 Spark DataFrame 转换为 Pandas DataFrame,当然它在内存中。

Pandas 低级计算是否全部由 Spark 处理

没有。 Pandas 运行自己的计算,spark 和 pandas 之间没有相互作用,只是一些 API 兼容性。

它是否公开了所有 pandas 数据框功能?

没有。例如,Series 对象有一个 interpolate 方法,这在 PySpark Column 对象中不可用。 pandas API 中的许多方法和函数不在 PySpark API 中。

我可以将它转换为 Pandas 并完成它,而无需太多接触 DataFrame API 吗?

当然。事实上,在这种情况下,您甚至可能根本不应该使用 Spark。 pandas.read_csv 可能会处理您的用例,除非您处理的是大量数据。

尝试使用简单、技术含量低、易于理解的库来解决您的问题,并且在您需要时使用更复杂的东西。很多时候,您不需要更复杂的技术。

【讨论】:

感谢您回答我的问题。其实也许我说的不够清楚。我是 spark 的初学者。我只是在这里测试从 csv 加载。我需要读取太大而无法在内存中处理的数据并进行数据分析。所以这里的目标是在 Hadoop 中进行一些数据分析。那么当我从 Hadoop(hive) 加载数据时,转换为 pandas 会将其加载到本地内存中? 我没有在单机上使用 hadoop。我可能必须从 hdfs 使用 hive 加载数据。如果我把它转换成 pandas,我可以在分布式系统中做 pandas 吗? 啊。我懂了。 Spark DataFrames 和 Pandas DataFrames 不共享计算基础设施。 Spark DataFrames 在有意义的地方模拟了 pandas DataFrames 的 API。如果您正在寻找可以让您在 Hadoop 生态系统中以类似 pandas 的方式运行的东西,还可以让您使用 pandas DataFrame 进入内存,请查看blaze。 除了 blaze,sparklingpandas 还旨在在 Spark DataFrames 上提供类似于 pandas 的 API:github.com/sparklingpandas/sparklingpandas 我可以先用 Pandas DataFrame 读取 csv 然后将其转换为 Spark DataFrame 吗?【参考方案2】:

使用一些 spark 上下文或 hive 上下文方法(sc.textFile()hc.sql())将数据“读入内存”会返回一个 RDD,但 RDD 保留在分布式内存中(工作节点上的内存),而不是主节点。所有 RDD 方法(rdd.map()rdd.reduceByKey() 等)都设计为在工作节点上并行运行,但有一些例外。例如,如果您运行 rdd.collect() 方法,您最终会将 rdd 的内容从所有工作节点复制到主节点内存。因此,您失去了分布式计算优势(但仍然可以运行 rdd 方法)。

与 pandas 类似,当您运行 toPandas() 时,您会将数据帧从分布式(工作)内存复制到本地(主)内存,并失去大部分分布式计算能力。因此,一种可能的工作流程(我经常使用)可能是使用分布式计算方法将您的数据预先调整为合理的大小,然后转换为丰富的功能集的 Pandas 数据框。希望对您有所帮助。

【讨论】:

以上是关于Spark DataFrame 方法 `toPandas` 实际上在做啥?的主要内容,如果未能解决你的问题,请参考以下文章

Spark sql查询到熊猫问题

Spark中将RDD转换成DataFrame的两种方法

Spark SQL中 RDD 转换到 DataFrame (方法二)

Spark中创建DataFrame方法总结

Spark Dataframe 除了方法问题

Spark RDD转换成DataFrame的两种方式