spark-streaming scala:如何将字符串数组传递给过滤器?

Posted

技术标签:

【中文标题】spark-streaming scala:如何将字符串数组传递给过滤器?【英文标题】:spark-streaming scala: how can I pass an array of strings to a filter? 【发布时间】:2019-01-25 20:12:48 【问题描述】:

我想用 .contains() 替换字符串数组中的字符串“a”来检查数组中的每个字符串。这可能吗?

val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.contains("a")))

编辑:

也试过这个(sc是sparkContext):

val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(a.contains(_)))

并得到以下错误:

java.io.NotSerializableException: org.apache.spark.streaming.twitter.TwitterInputDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。已强制执行此操作以避免 Spark 任务因不必要的对象而膨胀。

然后我尝试在使用之前广播数组:

val aBroadcast = sc.broadcast(a)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(aBroadcast.value.contains(_)))

得到了同样的错误。

谢谢

【问题讨论】:

【参考方案1】:

据我了解,您想查看拆分后的状态文本是否包含作为a 子集的单词列表:

val a = Array("a1", "a2")
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.forall(a contains))

【讨论】:

感谢您回答@Ahmad。我得到与上面编辑后完全相同的错误:.filter(a.contains(_)) @FerGarD 所以,作为一种可能性,我认为广播是正确的做法,但您可能需要将广播变量注释为 @volatile@transient 并在对象或班级级别,即:spark.apache.org/docs/2.3.2/…

以上是关于spark-streaming scala:如何将字符串数组传递给过滤器?的主要内容,如果未能解决你的问题,请参考以下文章

java spark-streaming接收TCP/Kafka数据

Spark-Streaming 记录比较

如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?

如何找到 Spark-streaming 中价值最小的键值对?

scala spark(2.10)读取kafka(2.10)示例

spark-streaming任务提交遇到的坑