如何在 Spark 中按键对 RDD 进行分区?

Posted

技术标签:

【中文标题】如何在 Spark 中按键对 RDD 进行分区?【英文标题】:How to partition RDD by key in Spark? 【发布时间】:2015-09-12 22:18:49 【问题描述】:

鉴于 HashPartitioner 文档说:

[HashPartitioner] 使用 Java 实现基于哈希的分区 Object.hashCode.

假设我想通过kindDeviceData 进行分区。

case class DeviceData(kind: String, time: Long, data: String)

通过覆盖deviceData.hashCode() 方法并仅使用kind 的哈希码来分区RDD[DeviceData] 是否正确?

但是考虑到HashPartitioner 采用了一些分区参数,我很困惑是否需要提前知道种类的数量以及如果种类多于分区会发生什么?

如果我将分区数据写入磁盘,读取时它会保持分区状态是否正确?

我的目标是打电话

  deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)

并且迭代器中只有具有相同kind 值的DeviceData

【问题讨论】:

【参考方案1】:

使用kind 做一个groupByKey 怎么样。或其他PairRDDFunctions 方法。

您让我觉得您并不真正关心分区,只是在一个处理流程中获得了所有特定类型?

pair 函数允许这样做:

rdd.keyBy(_.kind).partitionBy(new HashPartitioner(PARTITIONS))
   .foreachPartition(...)

但是,您可能会更安全一些,例如:

rdd.keyBy(_.kind).reduceByKey(....)

mapValues 或许多其他配对功能,可确保您获得整体效果

【讨论】:

【参考方案2】:

通过覆盖 deviceData.hashCode() 方法并仅使用同类哈希码对 RDD[DeviceData] 进行分区是否正确?

不会的。如果您阅读 Java Object.hashCode 文档,您会发现以下有关 hashCode 通用合同的信息:

如果两个对象根据 equals(Object) 方法相等,那么对两个对象中的每一个调用 hashCode 方法必须产生相同的整数结果。

因此,除非纯粹基于设备的kind 的平等概念适合您的用例,而且我严重怀疑它是否适合,否则修补HashCode 以获得所需的分区是一个坏主意。一般情况下你应该implement your own partitioner,但这里不是必需的。

因为,除了 SQL 和 GraphX 中的特殊场景,partitionBy 仅在 PairRDD 上有效,因此创建 RDD[(String, DeviceData)] 并使用普通的 HashPartitioner 是有意义的

deviceDataRdd.map(dev => (dev.kind, dev)).partitionBy(new HashPartitioner(n))

请记住,在kind 基数低或分布高度偏斜的情况下,使用它进行分区可能不是最佳解决方案。

【讨论】:

好点。对象不应该相等,因为它们具有相同的种类。 没有。这里有一个逻辑谬误->“除非纯粹基于一种设备的平等概念适合您的用例,而且我严重怀疑它确实如此,修改 HashCode 以获得所需的分区是一个坏主意”基于 Kind 的 HashCode 并不意味着平等纯粹基于种类。只是,if 2 条记录相等,它们的种类相同,这很容易满足。

以上是关于如何在 Spark 中按键对 RDD 进行分区?的主要内容,如果未能解决你的问题,请参考以下文章

RDD 中的分区数和 Spark 中的性能

spark算子:partitionBy对数据进行分区

Spark02

RDD take()方法如何在内部工作?

Spark入门02

Spark RDD - 分区总是在RAM中吗?