Spark - 如何使用有状态映射器对已排序的 RDD 进行平面映射?

Posted

技术标签:

【中文标题】Spark - 如何使用有状态映射器对已排序的 RDD 进行平面映射?【英文标题】:Spark - how to flatmap sorted RDD using a stateful mapper? 【发布时间】:2015-08-16 04:48:19 【问题描述】:

基本上,我有一个包含一系列事件(带有一些 categoryId)的 RDD/DataFrame。它们带有时间戳并按时间排序。 我想做的是扫描每个类别中的所有这些事件,同时保持/更新一些状态,该状态会记住是否看到了某些事件。一些例子:

用户登录亚马逊(记录新的会话 ID、时间戳) 用户将商品添加到 basked(将篮子大小增加到 1) 用户执行结账(增加花费的金额,输出 rdd add item: sessionId+start_timestamp+number if items + money) 用户将其他东西添加到购物篮中 执行结帐 -> 将下一项添加到输出 rdd

我非常想用一个有状态的映射器(它记住以前的项目)来做一个 flatMap。映射器可以有一个按 categoryId 的“状态”映射。但是有几百万个类别是什么?除了按类别+时间戳排序之外,还有什么更好的方法吗? 我还需要确保整个类别都在一个节点上。在这种情况下我应该按类别划分吗?我不确定数百万个分区是否是个好主意。

【问题讨论】:

【参考方案1】:

由于您的问题相当笼统,您将得到一个笼统的答案。除非您有充分的理由不这样做,否则您应该使用Data Frames 和Window Functions。

上述第一项将为您提供Catalyst Optimizer 的所有好处。第二个应该提供您可能会按照您的描述处理数据的操作:

PARTITION BY - 按类别划分数据 ORDER BY - 按时间戳排序 FRAME (ROWS / RANGE) - 窗口大小的可选限制 实际的functions 执行所需的操作

旁注

我不确定数百万个分区是否是个好主意。

不,这根本不是一个好主意,但是按某个键进行分区并不意味着您需要与唯一键的数量相同的分区数:

import org.apache.spark.HashPartitioner
val rdd = sc.parallelize(
   (1 to 10).flatMap(k => (1 to 100).map(_ => (k, scala.util.Random.nextInt)))
).partitionBy(new HashPartitioner(2))

在上面的示例中,您有 10 个不同的值,但只有 2 个分区。

【讨论】:

有道理。我想是时候升级到 1.4 了。

以上是关于Spark - 如何使用有状态映射器对已排序的 RDD 进行平面映射?的主要内容,如果未能解决你的问题,请参考以下文章

可以自动更新数据库模式的 O/R 映射器?

需要的建议:O/R 映射器 [重复]

如何在使用自动映射器时将属性名称的 jsonproperty 设置为 dto?

是否有任何 OR 映射器提供异步方法?

使用 MapReduce/Hadoop 对大数据进行排序

如果 RDD 变大,Spark 将如何反应?