计算每个 pyspark RDD 分区中的元素数
Posted
技术标签:
【中文标题】计算每个 pyspark RDD 分区中的元素数【英文标题】:Count number of elements in each pyspark RDD partition 【发布时间】:2016-08-12 19:13:22 【问题描述】:我正在寻找与此问题等效的 Pyspark:How to get the number of elements in partition?。
具体来说,我想以编程方式计算 pyspark RDD 或数据帧的每个分区中的元素数量(我知道此信息可在 Spark Web UI 中获得)。
这次尝试:
df.foreachPartition(lambda iter: sum(1 for _ in iter))
结果:
AttributeError: 'NoneType' 对象没有属性 '_jvm'
我不想将迭代器的内容收集到内存中。
【问题讨论】:
【参考方案1】:如果您问:我们能否在不迭代的情况下获得迭代器中的元素数量?答案是No。
但我们不必将它存储在内存中,就像您提到的帖子中那样:
def count_in_a_partition(idx, iterator):
count = 0
for _ in iterator:
count += 1
return idx, count
data = sc.parallelize([
1, 2, 3, 4
], 4)
data.mapPartitionsWithIndex(count_in_a_partition).collect()
编辑
请注意,您的代码非常接近解决方案,只是 mapPartitions
需要返回一个迭代器:
def count_in_a_partition(iterator):
yield sum(1 for _ in iterator)
data.mapPartitions(count_in_a_partition).collect()
【讨论】:
感谢@ShuaiYuan。不,我知道我必须遍历才能获得计数。您的第一个解决方案对我有用!但是,即使在您在示例中创建的“数据”rdd 上,第二个仍然会引发与我在 Spark 1.5.0(我的组织的集群)中的原始尝试相同的 AttributeError 。 AttributeError:“NoneType”对象没有属性“_jvm”。但是,在运行 1.6.0 或 1.5.2 的 Spark Community Edition 中,您的两个解决方案都可以工作。也许我的本地 CDH 发行版有些奇怪? 可能是。不幸的是,我没有可供测试的 Spark 1.5.0。以上是关于计算每个 pyspark RDD 分区中的元素数的主要内容,如果未能解决你的问题,请参考以下文章