SparkStreaming
Posted wangshuang123
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming相关的知识,希望对你有一定的参考价值。
1.workcount
package dayo7 import org.apache.log4j.Level, Logger import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds, StreamingContext object NewworkWordCount Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = //new Conf val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" ) //创建ssc 第二个参数是时间间隔 val ssc = new StreamingContext ( conf, Seconds ( 2 ) ) //获取数据 val result = ssc.socketTextStream ( "192.168.186.150", 1234 ) //处理数据,输出打印 val result2 = result.flatMap ( _.split ( " " ) ).map ( (_, 1) ).reduceByKey ( _ + _ ).print () //开启sparkStreaming ssc.start () //创建阻塞线程 ssc.awaitTermination ()
2.将数据写到redis
开启redis bin/redis-server etc/redis.conf 查看端口 ps -ef|grep redis
package dayo7 import day08.Jpoods import org.apache.log4j.Level, Logger import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.Seconds, StreamingContext object WordCountToRedis1 Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = //new xonf val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") //创建SparkStreaming val ssc=new StreamingContext(conf,Seconds(2)) //读取数据 val result: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.186.150",1234) result.foreachRDD(rdd=> //处理数据 val result2=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //写入到redis result2.foreachPartition(filter=> val jedis=Jpoods.getJedis() //创建表 filter.foreach(tp=> jedis.hincrBy("zi",tp._1,tp._2) ) //关闭jedis jedis.close() ) ) //开启SparkStraming ssc.start() //创建阻塞线程 ssc.awaitTermination()
以上是关于SparkStreaming的主要内容,如果未能解决你的问题,请参考以下文章