PySpark 重新分区 RDD 元素

Posted

技术标签:

【中文标题】PySpark 重新分区 RDD 元素【英文标题】:PySpark repartitioning RDD elements 【发布时间】:2015-06-05 23:06:02 【问题描述】:

我有一个从 Kafka 流中读取数据并为流中的每个 RDD 执行操作的 spark 作业。如果 RDD 不为空,我想将 RDD 保存到 HDFS,但我想为 RDD 中的每个元素创建一个文件。我找到了

RDD.saveAsTextFile(file_location)

将为每个分区创建一个文件,因此我尝试更改 RDD,使每个分区仅包含一个元素。这是我正在尝试做的一个示例

data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0'])
data.glom().collect() #Produces [['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0']]
data.saveAsTextFile(file_location) #Produces 2 files

我可以更接近我想要的,但我找不到确保每个分区只有一个元素的方法

data1 = data.coalesce(1, True).repartition(data.count())
data1.glom().collect() #Produces [[], ['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0'], [], [], [], [], [], [], []] 
data2 = data.map(lambda t : t).coalesce(1, True).repartition(data.count())
data2.glom().collect() #Produces [[], ['1'], ['2', '3'], ['4', '5'], ['6'], ['7', '8'], ['9', '0'], [], [], []] 
data2.saveAsTextFile(file_location) #Produces 10 files, but some are empty

我知道在这个例子中我可以将我想要的分区传递给 sc.parallelize() 但是当我从 kafka 流中读取时这是不可能的。关于如何以我想要的方式重新分区或如何更好地解决此问题的任何建议?

【问题讨论】:

【参考方案1】:

嗯,这是一个用于自定义分区的 python 解决方案。

(为了清楚起见,将每个元素放在单独的文件中可能不是最好的设计)。

data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0']).map(lambda x: (x,x))
print data.collect()
c = data.count()
wp = data.partitionBy(c,lambda k: int(k))
print wp.map(lambda t: t[0]).glom().collect()
sc.stop()

结果:

[('1', '1'), ('2', '2'), ('3', '3'), ('4', '4'), ('5', '5'), ('6', '6'), ('7', '7'), ('8', '8'), ('9', '9'), ('0', '0')]
[['0'], ['1'], ['2'], ['3'], ['4'], ['5'], ['6'], ['7'], ['8'], ['9']]

希望这会有所帮助。

【讨论】:

感谢您的帮助。我正在尝试概括您对字符串 RDD 的回答,但 partitionBy 存在一些问题。如果我创建 data = sc.parallelize(['this', 'is', 'an', 'example', 'of', 'speex', 'an', 'audio']).map(lambda x: (x, x)) 并尝试 data.partitionBy(data.count(), lambda k: str(k)).glom().collect() 我得到 TypeError: not all arguments convert during string formatting。我似乎无法让 partitionBy 使用除了转换为 int 之外的任何东西(并且只能使用适当的数据)。有什么建议吗? 如果您尝试使用字符串的分区键,则使用哈希函数,该函数将返回 int。最终,spark 通过运行 key %numPartitions 来决定分区。因此它需要一个整数作为 key。【参考方案2】:

python 分区器 API 在下面使用哈希分区器,这就是为什么即使您有 K 个存储桶,您仍然会遇到一些“冲突”。如果您可以在 Scala 中执行此操作,则可以提供自定义分区器(基于范围 + 存储桶数 == num elems 可能会解决问题)。但是,每个分区都有一些开销(并且重新分区是一项昂贵的操作),执行保存逻辑而不是 foreach 而不是重新分区可能更合理。

【讨论】:

我也遇到了同样的问题。您能否提供一个更清晰的答案,为什么在 PySpark 中使用哈希分区器会引起一些分区“冲突”?非常感谢! 因此,Python API 发生这种情况有几个原因,但基本上hash(x) mod Khash(y) mod K 可以相等,即使x != y 也是如此。

以上是关于PySpark 重新分区 RDD 元素的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:重新分区后出现“太多值”错误

计算每个 pyspark RDD 分区中的元素数

PySpark|RDD编程基础

PySpark 根据特定列重新分区

Spark重新分区不均匀分布记录

使用 pyspark 对 parquet 文件进行分区和重新分区