SparkStreaming

Posted wangshuang123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming相关的知识,希望对你有一定的参考价值。

1.workcount

技术图片

package dayo7

import org.apache.log4j.Level, Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds, StreamingContext

object NewworkWordCount 
Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = 
    //new Conf
    val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )

    //创建ssc  第二个参数是时间间隔
    val ssc = new StreamingContext ( conf, Seconds ( 2 ) )

    //获取数据
    val result = ssc.socketTextStream ( "192.168.186.150", 1234 )

    //处理数据,输出打印
    val result2 = result.flatMap ( _.split ( " " ) ).map ( (_, 1) ).reduceByKey ( _ + _ ).print ()

    //开启sparkStreaming
    ssc.start ()

    //创建阻塞线程
    ssc.awaitTermination ()
  
  

2.将数据写到redis

开启redis bin/redis-server  etc/redis.conf  查看端口 ps -ef|grep redis

package dayo7

import day08.Jpoods
import org.apache.log4j.Level, Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.Seconds, StreamingContext

object WordCountToRedis1 
Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = 
    //new xonf
    val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")

    //创建SparkStreaming
    val ssc=new StreamingContext(conf,Seconds(2))

    //读取数据
    val result: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.186.150",1234)


    result.foreachRDD(rdd=>
      //处理数据
      val result2=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

   //写入到redis
      result2.foreachPartition(filter=>
        val jedis=Jpoods.getJedis()

        //创建表
        filter.foreach(tp=>
          jedis.hincrBy("zi",tp._1,tp._2)
        )

        //关闭jedis
        jedis.close()

      )

    )

    //开启SparkStraming
    ssc.start()

    //创建阻塞线程
    ssc.awaitTermination()
  

 

以上是关于SparkStreaming的主要内容,如果未能解决你的问题,请参考以下文章