Spark之SparkStreaming案例

Posted chbxw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之SparkStreaming案例相关的知识,希望对你有一定的参考价值。

一、Spark Streaming的介绍

  Spark Streaming是Spark 核心API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从诸如Kafka,Flume,Kinesis或TCP套接字的许多来源中获取,并且可以使用由高级功能(如map,reduce,join和window)表达的复杂算法进行处理。 最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。 事实上,您可以在数据流上应用Spark的机器学习和图形处理算法。
这里写图片描述
  在内部,它的工作原理如下。 Spark Streaming接收实时输入数据流,并将数据分成批,然后由Spark引擎对其进行处理,以批量生成最终的结果流。
这里写图片描述

  Spark Streaming提供称为离散流或DStream的高级抽象,它表示连续的数据流。 可以从诸如Kafka,Flume和Kinesis等来源的输入数据流中创建DStream,或者通过对其他DStream应用高级操作来创建。 在内部,DStream表示为一系列RDD。
  本指南介绍如何开始使用DStreams编写Spark Streaming程序。 您可以在Scala,Java或Python(在Spark 1.2中引入)中编写Spark Streaming程序。

二、DStream

  DStream是一个抽象的概念, 表示一系列的RDD

三、简单案例

3.1、SparkStream从一个监听端口读取数据流

  在我们详细介绍如何编写自己的Spark Streaming程序之前,我们来看一下简单的Spark Streaming程序。 假设我们要计数从在TCP套接字上侦听的数据服务器接收的文本数据中的单词数。 所有你需要做的是如下。

3.1.1、首先,我们创建一个JavaStreamingContext对象,它是所有流功能的主要入口点。 我们创建一个带有两个执行线程的本地StreamingContext,并且间隔为1秒。

        //使用两个工作线程和1秒的批量间隔创建本地StreamingContext
        SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
        // 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
        // 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
        // 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));

3.1.2、使用此JavaStreamingContext,创建一个DStream,它表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如9999)。

        //首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
        //此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
        JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);

3.1.3、这个lines的DStream表示将从数据服务器接收的数据流。 此流中的每条记录都是一行文本。 然后,我们要将空格划分为单词。

        //使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

  flatMap是一个DStream操作,通过从源DStream中的每个记录生成多个新记录来创建新的DStream(line>words的操作)。 在这种情况下,每一行将被分割成多个单词,并将单词流表示为单词DStream。

请注意: 我们使用FlatMapFunction对象定义了转换。 我们可以发现,Java API中有许多这样的便利类可以帮助定义DStream转换。

3.1.4、统计单词数

        //Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //将此DStream中生成的每个RDD的前十个元素打印到控制台
        wcs.print();


The words DStream is further mapped (one-to-one transformation) to a DStream of 
(word, 1) pairs, using a PairFunction object. Then, it is reduced to get the 
frequency of words in each batch of data, using a Function2 object. Finally, 
wcs.print() will print a few of the counts generated every second.

使用PairFunction对象将词wordsDStream进一步映射(one-to-one transformation)到(word,1)pairs的DStream。 然后,使用Function2对象减少每批数据中的单词的频率。 最后,wcs.print()将打印每秒产生的几个计数。

3.1.5、Note that when these lines are executed, SparkStreaming only sets up the computation it will perform after it is started,
and no real processing has started yet.
To start the processing after all the transformations have been setup,
we finally call start method.

jsc.start();              // Start the computation
jsc.awaitTermination();   // Wait for the computation to terminate

3.1.6、您可以运行此示例如下。 您将首先需要运行Netcat(大多数类Unix系统中的一个小型实用程序)作为数据服务器

yum install nc
nc -lk 9999

在控制台写入数据

运行结果

这里写图片描述

java完整代码

package com.chb.spark.streaming;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
        // 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
        // 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
        // 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));

        //首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
        //此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
        JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);


        //使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        // 最后每次计算完,都打印一下这10秒钟的单词计数情况,并休眠5秒钟,以便于我们测试和观察
        wcs.print();


        jsc.start();
        jsc.awaitTermination();
        jsc.close();

    }
}

scala完整代码

package com.chb.scala

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Counts words in UTF8 encoded, '\\n' delimited text received from the network every second.
 *
 * Usage: NetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
    def main(args: Array[String]) {
        if (args.length < 2) {
            System.err.println("Usage: NetworkWordCount <hostname> <port>")
            System.exit(1)
        }


        // Create the context with a 1 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(1))

        // Create a socket stream on target ip:port and count the
        // words in input stream of \\n delimited text (eg. generated by 'nc')
        // Note that no duplication in storage level only for running locally.
        // Replication necessary in distributed scenario for fault tolerance.
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}


四、从hdfs中读取数据

   JavaDStream<String> lines = jsc.textFileStream("hdfs://192.168.1.224:9000/user/root/");

以上是关于Spark之SparkStreaming案例的主要内容,如果未能解决你的问题,请参考以下文章

通过spark-submit,本地测试SparkStreaming

spark小案例——sparkstreaming消费Kafka

spark小案例——sparkstreaming消费Kafka

spark小案例——sparkstreaming消费Kafka

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

第1课:通过案例对SparkStreaming 透彻理解三板斧之一