如何为 Spark RDD 中的元素分配唯一的连续编号

Posted

技术标签:

【中文标题】如何为 Spark RDD 中的元素分配唯一的连续编号【英文标题】:How to assign unique contiguous numbers to elements in a Spark RDD 【发布时间】:2014-07-19 07:14:48 【问题描述】:

我有一个(user, product, review) 的数据集,想将它输入到 mllib 的 ALS 算法中。

算法需要用户和产品是数字,而我的是字符串用户名和字符串 SKU。

现在,我获取不同的用户和 SKU,然后在 Spark 之外为它们分配数字 ID。

我想知道是否有更好的方法来做到这一点。我想到的一种方法是编写一个自定义 RDD,它基本上枚举 1 到 n,然后在两个 RDD 上调用 zip。

【问题讨论】:

如果有人想知道“SKU”代表什么,它是一个用于唯一标识产品的字母数字字符串。 【参考方案1】:

Spark 1.0 开始,您可以使用两种方法轻松解决此问题:

RDD.zipWithIndex 就像 Seq.zipWithIndex 一样,它添加了连续的 (Long) 数字。这需要首先计算每个分区中的元素,因此您的输入将被评估两次。如果你想使用它,缓存你的输入 RDD。 RDD.zipWithUniqueId 还为您提供唯一的 Long ID,但不能保证它们是连续的。 (仅当每个分区具有相同数量的元素时,它们才会是连续的。)好处是它不需要知道有关输入的任何信息,因此不会导致重复评估。

【讨论】:

谢谢。所以 RDD.zipWithUniqueId 不会扫描数据集两次? 正确。见github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/…。 如果它在RDD中两次,这是否会为同一个字符串分配不同的ID? 是的。每个元素都有不同的编号。【参考方案2】:

对于类似的示例用例,我只是对字符串值进行了哈希处理。见http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

听起来你已经在做这样的事情了,尽管散列可以更容易管理。

Matei 在这里建议了一种在 RDD 上模拟 zipWithIndex 的方法,这相当于在每个分区内分配全局唯一的 ID:https://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E

【讨论】:

这是个好主意,但必须小心number of collisions。对于正在编码的集合的数量(例如,标签、用户名等)接近 100k 的应用程序,冲突的数量可能会很大。 我已经在几千条记录之后发生了冲突,所以我通常不会推荐它。 在 ALS 用例中,冲突并不重要,但这只是在一定程度上是正确的。在碰撞很重要的情况下,是的,这不是一个好方法。【参考方案3】:

另一个简单的选择,如果使用 DataFrames 并且只关心唯一性是使用函数MonotonicallyIncreasingID

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

编辑:MonotonicallyIncreasingID 已弃用并删除 since Spark 2.0;它现在被称为 monotonically_increasing_id

【讨论】:

这种方法实际上不适用于 ALS 中的用户/项目标识符,因为 monotonically_increasing_id() 产生 64 位数字(即 long 不是 int),而“基于 DataFrame 的 API for ALS 目前仅支持用户和项目 ID 的整数”(来自 spark.apache.org/docs/2.0.0/ml-collaborative-filtering.html 我同意,对于 ALS 而言,最安全的是使用 zipWithIndex()【参考方案4】:

monotonically_increasing_id() 似乎是答案,但不幸的是它不适用于 ALS,因为它产生 64 位数字而 ALS 需要 32 位数字(请参阅我在 radek1st 对 deets 的回答下方的评论)。

我找到的解决方案是使用 zipWithIndex(),正如 Darabos 的回答中提到的那样。以下是如何实现它:

如果您已经有一个名为 userids 的不同用户的单列 DataFrame,您可以创建一个查找表 (LUT),如下所示:

# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))

现在你可以:

使用此 LUT 获取对 ALS 友好的整数 ID,以提供给 ALS 当您需要从 ALS ID 回到原始 ID 时,使用此 LUT 进行反向查找

显然,对项目做同样的事情。

【讨论】:

【参考方案5】:

人们已经推荐了monotonically_increasing_id(),并提到了它创建Longs而不是Ints的问题。

但是,根据我的经验(警告 - Spark 1.6) - 如果您在单个执行程序上使用它(之前重新分区为 1),则没有使用执行程序前缀,并且可以安全地将数字转换为 Int.显然,您需要少于 Integer.MAX_VALUE 行。

【讨论】:

执行者是否有可能死亡并且 spark 最终将相同的 id 分配给两个不同的条目?

以上是关于如何为 Spark RDD 中的元素分配唯一的连续编号的主要内容,如果未能解决你的问题,请参考以下文章

如何为 pandas 数据框中的不同组分配唯一 ID?

如何为R中的重复值分配唯一的等级编号

为每个元素流水线化的 RDD 分配唯一的键值

RDD take()方法如何在内部工作?

Excel:如何为唯一组合分配值/检查大型数据集中的唯一组合

如何为一行中的元素分配相同的高度?