必会SparkStreaming的窗口操作及实战
Posted 勾叔谈大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了必会SparkStreaming的窗口操作及实战相关的知识,希望对你有一定的参考价值。
-
窗口长度(windowDuration),控制每次计算最近的多少个批次的数据; -
滑动间隔(slideDuration),用来控制对新的 DStream 进行计算的间隔。
package cn.lagou.streaming
import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
object SocketLikeNCWithWindow {
def main(args: Array[String]): Unit = {
val port = 1521
val ss = new ServerSocket(port)
val socket: Socket = ss.accept()
println("connect to host : " + socket.getInetAddress)
var i = 0
// 每秒发送1个数
while(true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
package cn.lagou.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
// 每 5s 生成一个RDD(mini-batch)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 1521)
lines.foreachRDD{ (rdd, time) =>println(s"rdd = ${rdd.id}; time = $time")
rdd.foreach(value => println(value))
}
val res1: DStream[String] =lines.reduceByWindow(_ + " " + _,Seconds(20), Seconds(10))
res1.print()
val res2: DStream[String] = lines.window(Seconds(20),Seconds(10))
res2.print()
// 求窗口元素的和
val res3:DStream[Int]=lines.map(_.toInt).reduceByWindow(_+_,Seconds(20), Seconds(10))
res3.print()
// 求窗口元素的和
val res4 = res2.map(_.toInt).reduce(_+_)
res4.print()
ssc.start()
ssc.awaitTermination()
}
}
package cn.lagou.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HotWordStats {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
//设置检查点,检查点具有容错机制。生产环境中应设置到HDFS
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// 通过reduceByKeyAndWindow算子, 每隔10秒统计最近20秒的词出现的次数
// 后 3个参数:窗口时间长度、滑动窗口时间、分区
val wordCounts1: DStream[(String, Int)] =pairs.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,Seconds(20),Seconds(10), 2)
wordCounts1.print
// 这里需要checkpoint的支持
val wordCounts2: DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_ + _,_ - _,
Seconds(20),Seconds(10), 2)
wordCounts2.print
ssc.start()
ssc.awaitTermination()
}
}
以上是关于必会SparkStreaming的窗口操作及实战的主要内容,如果未能解决你的问题,请参考以下文章
Java技术探索「开发实战专题」Lombok插件开发实践必知必会操作!