DStream中的列表处理
Posted
技术标签:
【中文标题】DStream中的列表处理【英文标题】:List processing in DStream 【发布时间】:2016-11-17 20:12:14 【问题描述】:我有一个单词列表作为 DStream。例如:列表(汽车、速度、事故、速度、坏)。我想从这个列表中形成双克。我在 RDD 上遇到了这个问题,但在 DStreams 上遇到了问题。我正在使用 foreachRDD 函数。以下是我所拥有的-
我正在尝试在转换后打印 RDD 的内容。
def printRDD(rddString: RDD[String]) =
val z = rddString.map( y => y.toString.split(",").filter(_.nonEmpty).
map( y => y.replaceAll("""\W""", "").toLowerCase)
.filter(_.nonEmpty)
.sliding(2).filter(_.size == 2).map case Array(a, b) => ((a, b), 1) )
.flatMap(x => x)
println(z)
val x = lines.map(plainTextToLemmas(_, stopWords))
val words = x.flatMap( y=> y.toString.split(","))
words.foreachRDD( rdd => printRDD(rdd))
有什么办法可以显示转换函数printRDD后的内容。即使我在打印定义中使用 println(z),它也会在 flatMap 中返回 MapPartitionsRDD[18]。我正在使用 Kafka spark 流来读取输入,我在控制台上获得了单词值。我认为调用函数 printRDD 后单词不会改变。
【问题讨论】:
流处理后的二元组应该如何处理?该功能只是进行控制台打印。 【参考方案1】:您可以在DStream
上进行所有这些操作,而不是在foreachRDD
内部,然后在DStream
上调用print
:
lines
.map(plainTextToLemmas(_, stopWords))
.flatMap(y => y.toString.split(","))
.map(y => y.toString.split(",").filter(_.nonEmpty))
.map(y => y.replaceAll("""\W""", "").toLowerCase)
.filter(_.nonEmpty)
.sliding(2)
.filter(_.size == 2)
.flatMap case Array(a, b) => ((a, b), 1)
.print()
这应该将DStream
的内容打印到驱动程序的控制台。
需要注意的重要一点是,尽管您在 DStream
上进行操作,但它的方法在给定的批处理时间“钻入”底层 RDD
并在 RDD
中公开实际类型,因此您应该不需要使用foreachRDD
来访问内部的实际数据。
【讨论】:
以上是关于DStream中的列表处理的主要内容,如果未能解决你的问题,请参考以下文章