大数据学习——spark-steaming学习

Posted feifeicui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据学习——spark-steaming学习相关的知识,希望对你有一定的参考价值。

 

官网http://spark.apache.org/docs/latest/streaming-programming-guide.html

1.1.  用Spark Streaming实现实时WordCount

技术图片

1.安装并启动生成者

首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具

yum install -y nc

 

启动一个服务端并监听9999端口

nc -lk 9999

 

2.编写Spark Streaming程序

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds, StreamingContext

object NetworkWordCount 
  def main(args: Array[String]) 
    //设置日志级别
    LoggerLevel.setStreamingLogLevels()
    //创建SparkConf并设置为本地模式运行
    //注意local[2]代表开两个线程
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //设置DStream批次时间间隔为2秒
    val ssc = new StreamingContext(conf, Seconds(2))
    //通过网络读取数据
    val lines = ssc.socketTextStream("192.168.10.101", 9999)
    //将读到的数据用空格切成单词
    val words = lines.flatMap(_.split(" "))
    //将单词和1组成一个pair
    val pairs = words.map(word => (word, 1))
    //按单词进行分组求相同单词出现的次数
    val wordCounts = pairs.reduceByKey(_ + _)
    //打印结果到控制台
    wordCounts.print()
    //开始计算
    ssc.start()
    //等待停止
    ssc.awaitTermination()
  

3.启动Spark Streaming程序:由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序

注意:要指定并行度,如在本地运行设置setMaster("local[2]"),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1

技术图片

4.在Linux端命令行中输入单词

技术图片

5.在IDEA控制台中查看结果

技术图片

问题:结果每次在Linux段输入的单词次数都被正确的统计出来,但是结果不能累加!如果需要累加需要使用updateStateByKey(func)来更新状态,下面给出一个例子:

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.HashPartitioner, SparkConf
import org.apache.spark.streaming.StreamingContext, Seconds

object NetworkUpdateStateWordCount 
  /**
    * String : 单词 hello
    * Seq[Int] :单词在当前批次出现的次数
    * Option[Int] : 历史结果
    */
  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => 
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
    iter.flatMapcase(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))
  

  def main(args: Array[String]) 
    LoggerLevel.setStreamingLogLevels()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    //做checkpoint 写入共享存储中
    ssc.checkpoint("c://aaa")
    val lines = ssc.socketTextStream("192.168.10.100", 9999)
    //reduceByKey 结果不累加
    //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    //updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFunc
    val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  

1.1.  使用reduceByKeyAndWindow计算每分钟数据

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream, ReceiverInputDStream
import org.apache.spark.streaming.Seconds, StreamingContext

/**
  * Created by wangsenfeng on 2016/10/27.
  */
object SparkSqlTest 
  def main(args: Array[String]) 
    LoggerLevels.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("sparksql").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("./")
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.56.151",9999)
    val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(5),Seconds(5))
    result.print()
    ssc.start()
    ssc.awaitTermination()
  

1.1.  Spark Streaming整合Kafka完成网站点击流实时统计

技术图片

 

 

1.安装并配置zk

2.安装并配置Kafka

3.启动zk

4.启动Kafka

5.创建topic

bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \\

--replication-factor 3 --partitions 3 --topic urlcount

6.编写Spark Streaming应用程序

package cn.itcast.spark.streaming

package cn.itcast.spark

import org.apache.spark.HashPartitioner, SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Seconds, StreamingContext

object UrlCount 
  val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => 
    iterator.flatMapcase(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))
  

  def main(args: Array[String]) 
    //接收命令行中的参数
   // val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("master1ha:2181,master2:2181,master2ha:2181","g1","wangsf-test","2")
    //创建SparkConf并设置AppName
    val conf = new SparkConf().setAppName("UrlCount")
    //创建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(2))
    //设置检查点
    ssc.checkpoint(hdfs)
    //设置topic信息
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //重Kafka中拉取数据创建DStream
    val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
    //切分数据,截取用户点击的url
    val urls = lines.map(x=>(x.split(" ")(6), 1))
    //统计URL点击量
    val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    //将结果打印到控制台
    result.print()
    ssc.start()
    ssc.awaitTermination()
  

生产数据测试:

kafka-console-producer.sh --broker-list h2slave1:9092 --topic wangsf-test

 

以上是关于大数据学习——spark-steaming学习的主要内容,如果未能解决你的问题,请参考以下文章

大数据学习:Scala面向对象和Spark一些代码读和问

大数据学习——java代码实现对HDFS文件的readappendwrite操作

大数据学习——hdfs客户端流式操作代码的实现

大数据学习:Scala隐式转换和并发编程(DT大数据梦工厂)

51CTO微职位-大数据高级工程师2018年学习总结及2019年学习规划

大数据学习笔记:通过Java代码创建HBASE数据库