spark-streaming scala:如何将字符串数组传递给过滤器?
Posted
技术标签:
【中文标题】spark-streaming scala:如何将字符串数组传递给过滤器?【英文标题】:spark-streaming scala: how can I pass an array of strings to a filter? 【发布时间】:2019-01-25 20:12:48 【问题描述】:我想用 .contains() 替换字符串数组中的字符串“a”来检查数组中的每个字符串。这可能吗?
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.contains("a")))
编辑:
也试过这个(sc是sparkContext):
val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(a.contains(_)))
并得到以下错误:
java.io.NotSerializableException: org.apache.spark.streaming.twitter.TwitterInputDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。已强制执行此操作以避免 Spark 任务因不必要的对象而膨胀。
然后我尝试在使用之前广播数组:
val aBroadcast = sc.broadcast(a)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(aBroadcast.value.contains(_)))
得到了同样的错误。
谢谢
【问题讨论】:
【参考方案1】:据我了解,您想查看拆分后的状态文本是否包含作为a
子集的单词列表:
val a = Array("a1", "a2")
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.forall(a contains))
【讨论】:
感谢您回答@Ahmad。我得到与上面编辑后完全相同的错误:.filter(a.contains(_))
@FerGarD 所以,作为一种可能性,我认为广播是正确的做法,但您可能需要将广播变量注释为 @volatile
或 @transient
并在对象或班级级别,即:spark.apache.org/docs/2.3.2/…以上是关于spark-streaming scala:如何将字符串数组传递给过滤器?的主要内容,如果未能解决你的问题,请参考以下文章
java spark-streaming接收TCP/Kafka数据
如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?
如何找到 Spark-streaming 中价值最小的键值对?