Spark:如何将 PartialFunction 传递给 DStream?

Posted

技术标签:

【中文标题】Spark:如何将 PartialFunction 传递给 DStream?【英文标题】:Spark: How do I pass a PartialFunction to a DStream? 【发布时间】:2014-11-03 06:40:42 【问题描述】:

我正在尝试通过滑动窗口将部分函数传递给在 DStream 批处理中捕获的所有 RDD 的联合。假设我在离散为 1 秒批次的流上构建了一个超过 10 秒的窗口操作:

val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))

我的window 将有 K 个 RDD。我想在所有 K 个这些 RDD 的联合上使用 collect(f: PartialFunction[T, U])。我可以使用foreachRDD 调用联合运算符++,但我想返回RDD 而不是Unit 并避免副作用。

我要找的是像

这样的减速器
def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]

在我可以这样使用的DStream 上:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc))

但这在 Spark Streaming API 中不可用。

对于将流中捕获的 RDD 组合成单个 RDD 以便我可以传入部分函数,​​是否有人有任何好的想法?还是为了实现我自己的 RDD 减速器?也许此功能将在后续 Spark 版本中提供?

【问题讨论】:

计算函数将允许您在一段时间内获取 RDD。 @Anant 月经在哪里开始和结束? DStream 方法compute 只接受validTime 参数。这是我窗口的开始还是结束?另外,我将如何处理必须在与我的批次相同的时间间隔内重复调用 compute 的问题?我正在寻找不那么有状态的东西。 @nmurthy 你不能在 DStream 上做collect。你能进一步解释你想要做什么吗?可能还有其他方法。 @maasg 正确,我正在尝试在一个 DSteram 间隔中捕获的所有 RDD 的联合上调用 collect。我要做的有两个步骤:(1) 使用++ 运算符将一个 DStream 间隔中的所有 RDD 减少为一个 RDD,然后 (2) 在我的 reduced 上调用collect 使用 DStream 转换的 RDD。 然后你会如何处理collect 的结果? collect 只不过是结合了filtermap,它们在DStream API 上可用——但不知道为什么需要联合RDD。 【参考方案1】:

DStream 操作不直接支持部分功能,但实现相同功能并不难。

例如,让我们采用一个简单的偏函数,它接受一个字符串,如果它是一个数字,则产生一个字符串的 Int:

val pf:PartialFunction[String,Int] = case x if (Try(x.toInt).isSuccess) => x.toInt

我们有一个字符串的 dstream:

val stringDStream:DStream[String] = ??? // use your stream source here

然后我们可以像这样将偏函数应用到 DStream:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)

【讨论】:

以上是关于Spark:如何将 PartialFunction 传递给 DStream?的主要内容,如果未能解决你的问题,请参考以下文章

learning scala PartialFunction

Scala中的偏函数

scala 偏函数

Scala--偏函数

scala偏函数小栗子

Spark 中 bigint 的兼容数据类型是啥?我们如何将 bigint 转换为 spark 兼容的数据类型?