SparkStreaming简单例子(oldAPI)

Posted diyo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming简单例子(oldAPI)相关的知识,希望对你有一定的参考价值。

SparkStreaming简单例子

◆ 构建第一个Streaming程序: (wordCount) 

  ◆ Spark Streaming 程序最好以使用Maven或者sbt编译出来的独立应用的形式运行。

  ◆ 准备工作:
  1.引入Spark Streaming的jar
  2.scala流计算import声明
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.StreamingContext._
  import org.apache.spark.streaming.dstream.DStream
  import org.apache.spark.streaming.Duration
  import org.apache.spark.streaming.Seconds

 

1.初始化StreamingContext对象

   //创建一个本地StreamingContext两个工作线程和批间隔1秒。
   val conf = new SparkConf()
   conf.setMaster(“local[2]")
   conf.setAppName(“ NetworkWordCount")
   val ssc = new StreamingContext(conf, Seconds(1))

2.获取DStream对象 

  //创建一个连接到主机名的DStream,像localhost:9999

   val lines = ssc.socketTextStream("localhost", 9999)

3.操作DStream对象

  //将每一行接收到的数据通过空格分割成单词 

  val words = lines.flatMap(_.split(" “))
  //导入StreamingContext中的隐式转换
  import org.apache.spark.streaming.StreamingContext._

   // 对每一批次的单词进行转化求和

  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  // 每个批次中默认打印前十个元素到控制台
  wordCounts.print()

4.启动流处理程序

  ssc.start// 开始计算

  ssc.awaitTermination() // 等待计算终止

  ssc.stop() //结束应用

 

启动网络端口,模拟发送数据

  1.借助于nc命令,手动输入数据

    Linux/Mac :nc

    Windows:cat

      nc -lk 9999

  2.借助于代码,编写一个模拟数据发生器  

技术图片
package com.briup.streaming

import java.io.PrintWriter
import java.net.ServerSocket

import scala.io.Source

object MassageServer 

  // 定义随机获取整数的方法
  def index(length: Int) = 
    import java.util.Random
    val rdm = new Random
    rdm.nextInt(length)
  

  def main(args: Array[String]) 
    println("模拟数据器启动!!!")
    // 获取指定文件总的行数
    val filename ="Spark/ihaveadream.txt";
    val lines = Source.fromFile(filename).getLines.toList
    val filerow = lines.length

    // 指定监听某端口,当外部程序请求时建立连接
    val serversocket = new ServerSocket(9999);

    while (true) 
      //监听9999端口,获取socket对象
      val socket = serversocket.accept()
      //      println(socket)
      new Thread() 
        override def run = 
          println("Got client connected from: " + socket.getInetAddress)

          val out = new PrintWriter(socket.getOutputStream(), true)

          while (true) 
            Thread.sleep(1000)
            // 当该端口接受请求时,随机获取某行数据发送给对方
            val content = lines(index(filerow))

            println (content)

            out.write(content + ‘\n‘)

            out.flush()
          
          socket.close()
        
      .start()
    
  
模拟发送数据

 

 

注意事项:

◆ 1.启动 Spark Streaming 之前所作的所有步骤只是创建了执行流程, 程序没有真正
连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划
◆ 2.当 ssc.start()启动后程序才真正进行所有预期的操作
◆ 3.执行会在另一个线程中进行,所以需要调用awaitTermination来等待流计算完成
◆ 4.一个Streaming context只能启动一次
◆ 5.如果模式是本地模式,那么请务必设置local[n] ,n>=2   1个用于接收,1个用于处理


package com.briup.streaming

import org.apache.log4j.Level, Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration, StreamingContext

object MyTestOldAPI 
  def main(args: Array[String]): Unit = 
    //设置日志级别
    Logger.getLogger("org").setLevel(Level.WARN)

    //1 获取DS
    val conf = new SparkConf().setAppName("MyTestOldAPI").setMaster("local[*]")
    val dss = new StreamingContext(conf, Duration(1000))
    val ds = dss.socketTextStream("localhost", 9999)

    //2 逻辑处理  //统计
    val res = ds.filter(_ != "").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

    res.print()

    //3 开启实时处理任务
    dss.start()
    dss.awaitTermination()
    dss.stop()
  

 

以上是关于SparkStreaming简单例子(oldAPI)的主要内容,如果未能解决你的问题,请参考以下文章

SparkStreaming消费kafka数据

Spark训练营-- SparkStreaming流计算组件wordCount实战

spark streaming怎么创建文档

spark streaming可以用于批处理吗

spark streaming中怎么对数据进行排序

SparkStreaming---简单demo(NetCat)