Stream整合Flume

Posted xjqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Stream整合Flume相关的知识,希望对你有一定的参考价值。

 1 package com.bawei.stream
 2 
 3 import java.net.InetSocketAddress
 4 
 5 import org.apache.spark.storage.StorageLevel
 6 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 7 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
 8 import org.apache.spark.streaming.{Seconds, StreamingContext}
 9 import org.apache.spark.{SparkConf, SparkContext}
10 
11 
12 object StreamFlume {
13   def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
14     val newCount =runningCount.getOrElse(0)+newValues.sum
15     Some(newCount)
16   }
17 
18 
19   def main(args: Array[String]): Unit = {
20     //配置sparkConf参数
21     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
22     //构建sparkContext对象
23     val sc: SparkContext = new SparkContext(sparkConf)
24     sc.setLogLevel("WARN")
25     //构建StreamingContext对象,每个批处理的时间间隔
26     val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
27     //设置checkpoint
28     scc.checkpoint("C:\Users\Desktop\checkpoint2")
29     //设置flume的地址,可以设置多台
30     val address=Seq(new InetSocketAddress("192.168.182.147",8888))
31     // 从flume中拉取数据
32     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)
33 
34     //获取flume中数据,数据存在event的body中,转化为String
35     val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
36     //实现单词汇总
37     val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)
38 
39     result.print()
40     scc.start()
41     scc.awaitTermination()
42   }
43 }

 

以上是关于Stream整合Flume的主要内容,如果未能解决你的问题,请参考以下文章

大数据篇:flume+kafka+spark stream+hbase做日志收集

大数据篇:flume+kafka+spark stream+hbase做日志收集

Spark Streaming整合Flume

SparkStreaming整合flume

flume与kafka整合高可靠教程

flume 整合kafka