如何获取分区中的元素数量? [复制]

Posted

技术标签:

【中文标题】如何获取分区中的元素数量? [复制]【英文标题】:How to get the number of elements in partition? [duplicate] 【发布时间】:2015-02-24 02:20:40 【问题描述】:

在给定分区 ID 的情况下,有什么方法可以获取 spark RDD 分区中的元素数量?无需扫描整个分区。

类似这样的:

Rdd.partitions().get(index).size()

除了我没有看到这样的火花 API。有任何想法吗?解决方法?

谢谢

【问题讨论】:

这个问题的DataFrame版本:***.com/q/46032320/877069 【参考方案1】:

以下为您提供了一个新的 RDD,其中的元素是每个分区的大小:

rdd.mapPartitions(iter => Array(iter.size).iterator, true) 

【讨论】:

谢谢!据我了解iter.size 遍历整个分区以获取其大小(如果我在这里错了,请纠正我)。有什么方法可以在不迭代的情况下获取分区大小? 这是正确的 - 在直接查询迭代之前无法知道大小,因为它在内存方面更有效,其中数据不是一次全部按需获取(无法放入可用内存) .【参考方案2】:

PySpark:

num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect()  # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l))  # check if skewed

火花/斯卡拉:

val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect()  # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length)  # check if skewed

对于数据帧也是如此,而不仅仅是 RDD。 只需将 DF.rdd.glom... 添加到上面的代码中即可。

注意glom() 转换每个分区into a list 的元素,所以它是内存密集型的。内存密集度较低的版本(仅限 pyspark 版本):

import statistics 

def get_table_partition_distribution(table_name: str):

    def get_partition_len (iterator):
        yield sum(1 for _ in iterator)

    l = spark.table(table_name).rdd.mapPartitions(get_partition_len, True).collect()  # get length of each partition
    num_partitions = len(l)
    min_count = min(l)
    max_count = max(l)
    avg_count = sum(l)/num_partitions
    stddev = statistics.stdev(l)
    print(f"table_name each of num_partitions partition's counts: min=min_count:, avg±stddev=avg_count:,.1f ±stddev:,.1f max=max_count:,")


get_table_partition_distribution('someTable')

输出类似

someTable 每个 1445 个分区的计数: min=1,201,201 avg±stddev=1,202,811.6 ±21,783.4 max=2,030,137

【讨论】:

【参考方案3】:

我知道我在这里有点晚了,但我有另一种方法可以通过利用 spark 的内置函数来获取分区中的元素数量。它适用于 2.1 以上的 spark 版本。

说明: 我们将创建一个示例数据帧 (df),获取分区 id,对分区 id 进行分组,并对每条记录进行计数。

Pyspark:

>>> from pyspark.sql.functions import spark_partition_id, count as _count
>>> df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
>>> df.rdd.getNumPartitions()
4
>>> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(_count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
|           0|        48|
|           1|        44|
|           2|        32|
|           3|        48|
+------------+----------+

斯卡拉:

scala> val df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: string, value: string ... 1 more field]

scala> df.rdd.getNumPartitions
res0: Int = 4

scala> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
|           0|        48|
|           1|        44|
|           2|        32|
|           3|        48|
+------------+----------+

【讨论】:

【参考方案4】:

pzecevic 的答案有效,但从概念上讲,不需要构造数组然后将其转换为迭代器。我会直接构造迭代器,然后通过对方付费调用获取计数。

rdd.mapPartitions(iter => Iterator(iter.size), true).collect()

附:不确定他的答案是否真的做了更多的工作,因为 Iterator.apply 可能会将其参数转换为数组。

【讨论】:

以上是关于如何获取分区中的元素数量? [复制]的主要内容,如果未能解决你的问题,请参考以下文章

如何获取指针数组中的元素数量?

如何获取文件夹中文件的数量? [复制]

如何获取无缓冲通道中的元素数量

如何获取firestore集合下的文档数量? [复制]

推力:使用device_ptr时如何获取copy_if函数复制的元素个数

如何获取数据堆栈上的元素数量?