限制火花上下文中的记录数量

Posted

技术标签:

【中文标题】限制火花上下文中的记录数量【英文标题】:Limit the amount of records in a spark context 【发布时间】:2016-03-08 15:14:06 【问题描述】:

我想减少每个reducer的记录数,并将结果变量保留为rdd

使用takeSample 似乎是显而易见的选择,但是,它返回一个collection 而不是SparkContext 对象。

我想出了这个方法:

rdd = rdd.zipWithIndex().filter(lambda x:x[1]<limit).map(lambda x:x[0])

但是,这种方法速度很慢,效率也不高。

有没有更聪明的方法来抽取一个小样本并将数据结构保持为rdd

【问题讨论】:

如果可以随机抽取样本,可以使用randomSplit或sample 【参考方案1】:

如果您想要一个示例子集并且不能对数据做出任何额外假设,那么take 结合parallelize 可能是最佳解决方案:

sc.parallelize(rdd.take(n))

它将涉及相对较少的分区(在最佳情况下只有一个),并且小n的网络流量成本应该可以忽略不计。

采样(randomSplitsample)将需要与zipWithIndexfilter 相同的完整数据扫描。

假设没有数据倾斜,您可以尝试这样的方法来解决这个问题:

from __future__ import division  # Python 2 only

def limitApprox(rdd, n, timeout):
    count = rdd.countApprox(timeout)
    if count <= n:
        return rdd
    else:
        rec_per_part = count // rdd.getNumPartitions()
        required_parts = n / rec_per_part if rec_per_part else 1
        return rdd.mapPartitionsWithIndex(
            lambda i, iter: iter if i < required_parts else []
        )
这仍然会访问每个分区,但如果没有必要,会尽量避免计算内容 如果存在较大的数据偏差将无法工作 如果分布是均匀的,但 n 如果分布偏向高指数,则可能会欠采样。

如果数据可以表示为Row,您可以尝试另一个技巧:

rdd.toDF().limit(n).rdd

【讨论】:

以上是关于限制火花上下文中的记录数量的主要内容,如果未能解决你的问题,请参考以下文章

成功创建火花上下文后,Livy 会话卡在启动

如何确定活动WebGL上下文的数量?

清理火花缓存数据

Pyspark 无法初始化火花上下文

是否可以在单火花上下文中收听两个 dtsreams?

如何在独立的火花中将处理限制为指定数量的核心