Spark:如何将 PartialFunction 传递给 DStream?
Posted
技术标签:
【中文标题】Spark:如何将 PartialFunction 传递给 DStream?【英文标题】:Spark: How do I pass a PartialFunction to a DStream? 【发布时间】:2014-11-03 06:40:42 【问题描述】:我正在尝试通过滑动窗口将部分函数传递给在 DStream 批处理中捕获的所有 RDD 的联合。假设我在离散为 1 秒批次的流上构建了一个超过 10 秒的窗口操作:
val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))
我的window
将有 K 个 RDD。我想在所有 K 个这些 RDD 的联合上使用 collect(f: PartialFunction[T, U])
。我可以使用foreachRDD
调用联合运算符++
,但我想返回RDD
而不是Unit
并避免副作用。
我要找的是像
这样的减速器def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]
在我可以这样使用的DStream
上:
window.reduce(_ ++ _).transform(_.collect(myPartialFunc))
但这在 Spark Streaming API 中不可用。
对于将流中捕获的 RDD 组合成单个 RDD 以便我可以传入部分函数,是否有人有任何好的想法?还是为了实现我自己的 RDD 减速器?也许此功能将在后续 Spark 版本中提供?
【问题讨论】:
计算函数将允许您在一段时间内获取 RDD。 @Anant 月经在哪里开始和结束? DStream 方法compute
只接受validTime
参数。这是我窗口的开始还是结束?另外,我将如何处理必须在与我的批次相同的时间间隔内重复调用 compute
的问题?我正在寻找不那么有状态的东西。
@nmurthy 你不能在 DStream 上做collect
。你能进一步解释你想要做什么吗?可能还有其他方法。
@maasg 正确,我正在尝试在一个 DSteram 间隔中捕获的所有 RDD 的联合上调用 collect
。我要做的有两个步骤:(1) 使用++
运算符将一个 DStream 间隔中的所有 RDD 减少为一个 RDD,然后 (2) 在我的 reduced 上调用collect
使用 DStream 转换的 RDD。
然后你会如何处理collect
的结果? collect
只不过是结合了filter
和map
,它们在DStream
API 上可用——但不知道为什么需要联合RDD。
【参考方案1】:
DStream 操作不直接支持部分功能,但实现相同功能并不难。
例如,让我们采用一个简单的偏函数,它接受一个字符串,如果它是一个数字,则产生一个字符串的 Int:
val pf:PartialFunction[String,Int] = case x if (Try(x.toInt).isSuccess) => x.toInt
我们有一个字符串的 dstream:
val stringDStream:DStream[String] = ??? // use your stream source here
然后我们可以像这样将偏函数应用到 DStream:
val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)
【讨论】:
以上是关于Spark:如何将 PartialFunction 传递给 DStream?的主要内容,如果未能解决你的问题,请参考以下文章