DStream 转换操作------有状态转换操作

Posted soyosuyang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DStream 转换操作------有状态转换操作相关的知识,希望对你有一定的参考价值。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object DStream_转换操作 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("转换操作").setMaster("local[2]")
    val sc=new StreamingContext(conf,Seconds(4))
    val lines=sc.socketTextStream("localhost",8899)
    sc.checkpoint("file:///usr/local2/spark/mycode/kafa3/checkpoint")

    val words=lines.flatMap(x=>x.split(" "))
    val wordsStream=words.map(x=>(x,1))
    3.val stateStream=wordsStream.updateStateByKey[Int](update)
      sc.checkpoint("file:///usr/local2/spark/mycode/kafa2/checkpoint")
    1. //val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Seconds(16),Seconds(4),2)//DStream有状态转换操作
    2. val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(16),Seconds(4),2)
     wordCount.print(100)
    stateStream.print()
    sc.start()
    sc.awaitTermination()
  }
   val update=(values:Seq[Int],state:Option[Int])=>{
     val currentCount=values.foldLeft(0)(_+_)
     val previousCount= state.getOrElse(0)
     Some(currentCount+previousCount)
   }
}

注意:

reduceByKeyAndWindow中的Seconds(16)是滑动窗口长度,Seconds(4)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)这两个值必须是 new StreamingContext(conf,Seconds(4)) 中Seconds(4)的倍数(>=1)
如果第二个4<滑动窗口时间间隔 程序结果的时间线就变成了以滑动窗口时间间隔为准
1,2,3区别:
1.会保留历史对象的名字列表
2.不会保留
3.在历史值的基础上累加,但(1,2)会随着窗口滑动,所有对象的值会变为0
4.(1和2适合统计实时时间段内词频)

 

以上是关于DStream 转换操作------有状态转换操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark DStream 转换

Spark Streaming的核心DStream之转换操作实例

Spark Streaming的核心DStream之转换操作实例

深入理解Spark Streaming

SparkStreaming

070 DStream中的transform和foreachRDD函数