如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告

Posted

技术标签:

【中文标题】如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告【英文标题】:How do you iterate through distinct values of a column in a large Pyspark Dataframe? .distinct().collect() raises a large task warning 【发布时间】:2020-01-13 20:54:15 【问题描述】:

我正在尝试遍历大型 Pyspark 数据框列中的所有不同值。当我尝试使用.distinct().collect() 执行此操作时,即使只有两个不同的值,它也会引发“任务太大”警告。 警告信息:

20/01/13 20:39:01 WARN TaskSetManager: Stage 0 contains a task of very large size (201 KB). The maximum recommended task size is 100 KB.

这里是一些示例代码:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Basics').getOrCreate()
length = 200000

data = spark.createDataFrame([[float(0) for x in range(3)] for x in range(length)], ['a', 'b', 'c'])

data.select("a").distinct().collect()
# This code produces this warning

如何在大型 Pyspark Dataframe 的列中迭代不同的值而不遇到内存问题?

【问题讨论】:

如果你尝试运行df.select("*").limit(1).collect(),你会得到同样的错误吗? 是的。运行该行时我收到相同的警告。 【参考方案1】:

如您所知,.collect() 不是最佳做法。因为,这是一个将所有数据从执行程序传输到驱动程序的操作。问题是当您有一个大型数据集时,Spark 执行器会向驱动程序发送大量序列化数据,然后收集 2 行。您还可以查看产生警告的TaskSetManager。

总的来说,解决您的问题的方法可能是与磁盘交换内存。您可以在一个 csv 中写入具有不同值的数据帧,然后使用 Python 或 Pandas* 逐行再次读取:

data.select("a").distinct().coalesce(1).write.csv("temp.csv")
# Specifically, it's a directory with one csv.

使用此解决方案,您的内存不会有任何问题。

*关于如何使用 Python 或 Pandas 读取大型 CSV 的解决方案有很多。

【讨论】:

但是由于只有 2 个不同的值, .collect() 不应该工作吗?只有 2 个不同值的小型原始数据框不会引发此警告。但是,如果您有一个只有 2 个不同值的大型原始数据框,则会引发错误。为什么原始数据框的大小很重要? 如您所见,警告消息来自TaskSetManager。如果我们在 source Scala code 的类中搜索,我们会在第 448 行找到消息。正如您在 if 语句中看到的,您的任务超出了序列化内存限制。因为,当您有一个大型数据集时,Spark 执行器会向驱动程序发送大量数据,然后收集 2 行 @Jimmy 我根据您的精彩评论和我上面的评论编辑我的答案。现在清楚了吗? 所以我尝试了这个修复,但它没有帮助。当我运行此代码时,警告仍然存在。 "data.select("a").distinct().coalesce(1).write" 很好,但 ".csv("temp.csv") 会触发相同的警告。有什么想法吗? @ggeop 您提到的代码负责任务大小而不是内存管理。方法dequeueTask 负责任务的序列化和执行。

以上是关于如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)

将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时

pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()

将 pyspark groupedData 转换为 pandas DataFrame

Pyspark DataFrame - 转义 &

遍历 pyspark 数据框列