DStream中的列表处理

Posted

技术标签:

【中文标题】DStream中的列表处理【英文标题】:List processing in DStream 【发布时间】:2016-11-17 20:12:14 【问题描述】:

我有一个单词列表作为 DStream。例如:列表(汽车、速度、事故、速度、坏)。我想从这个列表中形成双克。我在 RDD 上遇到了这个问题,但在 DStreams 上遇到了问题。我正在使用 foreachRDD 函数。以下是我所拥有的-

我正在尝试在转换后打印 RDD 的内容。

 def printRDD(rddString: RDD[String]) =
      val z = rddString.map( y => y.toString.split(",").filter(_.nonEmpty).
        map( y => y.replaceAll("""\W""", "").toLowerCase)
        .filter(_.nonEmpty)
        .sliding(2).filter(_.size == 2).map case Array(a, b) => ((a, b), 1) )
        .flatMap(x => x)
        println(z)

 val x = lines.map(plainTextToLemmas(_, stopWords))
 val words = x.flatMap( y=> y.toString.split(","))
 words.foreachRDD( rdd => printRDD(rdd))

有什么办法可以显示转换函数printRDD后的内容。即使我在打印定义中使用 println(z),它也会在 flatMap 中返回 MapPartitionsRDD[18]。我正在使用 Kafka spark 流来读取输入,我在控制台上获得了单词值。我认为调用函数 printRDD 后单词不会改变。

【问题讨论】:

流处理后的二元组应该如何处理?该功能只是进行控制台打印。 【参考方案1】:

您可以在DStream 上进行所有这些操作,而不是在foreachRDD 内部,然后在DStream 上调用print

lines
  .map(plainTextToLemmas(_, stopWords))
  .flatMap(y => y.toString.split(","))
  .map(y => y.toString.split(",").filter(_.nonEmpty))
  .map(y => y.replaceAll("""\W""", "").toLowerCase)
  .filter(_.nonEmpty)
  .sliding(2)
  .filter(_.size == 2)
  .flatMap  case Array(a, b) => ((a, b), 1)  
  .print()

这应该将DStream 的内容打印到驱动程序的控制台。

需要注意的重要一点是,尽管您在 DStream 上进行操作,但它的方法在给定的批处理时间“钻入”底层 RDD 并在 RDD 中公开实际类型,因此您应该不需要使用foreachRDD 来访问内部的实际数据。

【讨论】:

以上是关于DStream中的列表处理的主要内容,如果未能解决你的问题,请参考以下文章

070 DStream中的transform和foreachRDD函数

深入理解Spark Streaming

SparkStreaming DStream转换

跨 dstream 的不同元素

53输入DStream之Kafka数据源实战

Spark:如何将 PartialFunction 传递给 DStream?