计算每个 pyspark RDD 分区中的元素数

Posted

技术标签:

【中文标题】计算每个 pyspark RDD 分区中的元素数【英文标题】:Count number of elements in each pyspark RDD partition 【发布时间】:2016-08-12 19:13:22 【问题描述】:

我正在寻找与此问题等效的 Pyspark:How to get the number of elements in partition?。

具体来说,我想以编程方式计算 pyspark RDD 或数据帧的每个分区中的元素数量(我知道此信息可在 Spark Web UI 中获得)。

这次尝试:

df.foreachPartition(lambda iter: sum(1 for _ in iter))

结果:

AttributeError: 'NoneType' 对象没有属性 '_jvm'

我不想将迭代器的内容收集到内存中。

【问题讨论】:

【参考方案1】:

如果您问:我们能否在不迭代的情况下获得迭代器中的元素数量?答案是No。

但我们不必将它存储在内存中,就像您提到的帖子中那样:

def count_in_a_partition(idx, iterator):
  count = 0
  for _ in iterator:
    count += 1
  return idx, count

data = sc.parallelize([
    1, 2, 3, 4
], 4)

data.mapPartitionsWithIndex(count_in_a_partition).collect()

编辑

请注意,您的代码非常接近解决方案,只是 mapPartitions 需要返回一个迭代器:

def count_in_a_partition(iterator):
  yield sum(1 for _ in iterator)

data.mapPartitions(count_in_a_partition).collect()

【讨论】:

感谢@ShuaiYuan。不,我知道我必须遍历才能获得计数。您的第一个解决方案对我有用!但是,即使在您在示例中创建的“数据”rdd 上,第二个仍然会引发与我在 Spark 1.5.0(我的组织的集群)中的原始尝试相同的 AttributeError 。 AttributeError:“NoneType”对象没有属性“_jvm”。但是,在运行 1.6.0 或 1.5.2 的 Spark Community Edition 中,您的两个解决方案都可以工作。也许我的本地 CDH 发行版有些奇怪? 可能是。不幸的是,我没有可供测试的 Spark 1.5.0。

以上是关于计算每个 pyspark RDD 分区中的元素数的主要内容,如果未能解决你的问题,请参考以下文章

PySpark|RDD编程基础

RDD 中的分区数和 Spark 中的性能

如何在pyspark中查看RDD中每个分区的内容?

[Pyspark]RDD常用方法总结

[Pyspark]RDD常用方法总结

在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?