如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告
Posted
技术标签:
【中文标题】如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告【英文标题】:How do you iterate through distinct values of a column in a large Pyspark Dataframe? .distinct().collect() raises a large task warning 【发布时间】:2020-01-13 20:54:15 【问题描述】:我正在尝试遍历大型 Pyspark 数据框列中的所有不同值。当我尝试使用.distinct().collect()
执行此操作时,即使只有两个不同的值,它也会引发“任务太大”警告。
警告信息:
20/01/13 20:39:01 WARN TaskSetManager: Stage 0 contains a task of very large size (201 KB). The maximum recommended task size is 100 KB.
这里是一些示例代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Basics').getOrCreate()
length = 200000
data = spark.createDataFrame([[float(0) for x in range(3)] for x in range(length)], ['a', 'b', 'c'])
data.select("a").distinct().collect()
# This code produces this warning
如何在大型 Pyspark Dataframe 的列中迭代不同的值而不遇到内存问题?
【问题讨论】:
如果你尝试运行df.select("*").limit(1).collect()
,你会得到同样的错误吗?
是的。运行该行时我收到相同的警告。
【参考方案1】:
如您所知,.collect()
不是最佳做法。因为,这是一个将所有数据从执行程序传输到驱动程序的操作。问题是当您有一个大型数据集时,Spark 执行器会向驱动程序发送大量序列化数据,然后收集 2 行。您还可以查看产生警告的TaskSetManager。
总的来说,解决您的问题的方法可能是与磁盘交换内存。您可以在一个 csv 中写入具有不同值的数据帧,然后使用 Python 或 Pandas* 逐行再次读取:
data.select("a").distinct().coalesce(1).write.csv("temp.csv")
# Specifically, it's a directory with one csv.
使用此解决方案,您的内存不会有任何问题。
*关于如何使用 Python 或 Pandas 读取大型 CSV 的解决方案有很多。
【讨论】:
但是由于只有 2 个不同的值, .collect() 不应该工作吗?只有 2 个不同值的小型原始数据框不会引发此警告。但是,如果您有一个只有 2 个不同值的大型原始数据框,则会引发错误。为什么原始数据框的大小很重要? 如您所见,警告消息来自TaskSetManager。如果我们在 source Scala code 的类中搜索,我们会在第 448 行找到消息。正如您在 if 语句中看到的,您的任务超出了序列化内存限制。因为,当您有一个大型数据集时,Spark 执行器会向驱动程序发送大量数据,然后收集 2 行 @Jimmy 我根据您的精彩评论和我上面的评论编辑我的答案。现在清楚了吗? 所以我尝试了这个修复,但它没有帮助。当我运行此代码时,警告仍然存在。 "data.select("a").distinct().coalesce(1).write" 很好,但 ".csv("temp.csv") 会触发相同的警告。有什么想法吗? @ggeop 您提到的代码负责任务大小而不是内存管理。方法dequeueTask
负责任务的序列化和执行。以上是关于如何遍历大型 Pyspark Dataframe 中列的不同值? .distinct().collect() 引发大任务警告的主要内容,如果未能解决你的问题,请参考以下文章
在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)
将大型 DataFrame 从 PySpark 写入 Kafka 遇到超时
pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()