Spark Streaming与Spark SQL结合操作详解

Posted <一蓑烟雨任平生>

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming与Spark SQL结合操作详解相关的知识,希望对你有一定的参考价值。

Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通 过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操 作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类 top3热门的商品。

数据输入模型:

--------------------------
姓名		商品分类		商品名称
alex 	bigdata		hadoop
alex 	bigdata		spark
jack 	bigdata		flink
alex 	bigdata		hadoop
alex 	language	java
pony 	language	python
jone 	language	java
lili 	bigdata		hadoop
--------------------------

第一步:mapToPair

第一步:mapToPair
<bigdata_hadoop,1>
<bigdata_spark,1>
<bigdata_flink,1>
...

第二步:reduceByKeyAndWindow

<bigdata_hadoop,5>
<bigdata_spark,2>
<bigdata_flink,4>
...

第三步:foreachRDD

foreachRDD -> DataFrame -> registerTempView -> sql

Java语言实现:

package com.kfk.spark.common;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext 

    public static JavaStreamingContext getJssc()
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(2));
    

package com.kfk.spark.top_hot_product_project;

import com.kfk.spark.common.CommSparkSession;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/19
 * @time : 1:36 下午
 */
public class TopHotProduct 
    public static void main(String[] args) throws InterruptedException 
        JavaStreamingContext jssc = CommStreamingContext.getJssc();

        /**
         * 输入数据源模型
         * --------------------------
         * 姓名		商品分类		商品名称
         * alex 	bigdata		hadoop
         * alex 	bigdata		spark
         * jack 	bigdata		flink
         * alex 	bigdata		hadoop
         * alex 	language	java
         * pony 	language	python
         * jone 	language	java
         * lili 	bigdata		hadoop
         * --------------------------
         */
        JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999);

        /**
         * mapToPair
         * <bigdata_hadoop,1>
         * <bigdata_spark,1>
         * <bigdata_flink,1>
         */
        JavaPairDStream<String, Integer> pairDstream = inputDstream.mapToPair(new PairFunction<String, String, Integer>() 
            @Override
            public Tuple2<String, Integer> call(String lines) throws Exception 

                String[] line = lines.split(" ");
                return new Tuple2<>(line[1] + "_" + line[2], 1);
            
        );

        /**
         * reduceByKeyAndWindow
         * <bigdata_hadoop,5>
         * <bigdata_spark,2>
         * <bigdata_flink,4>
         */
        JavaPairDStream<String, Integer> windowPairStream = pairDstream.reduceByKeyAndWindow(
                new Function2<Integer, Integer, Integer>() 
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception 
                return v1+v2;
            
        , Durations.seconds(60),Durations.seconds(10));

        /**
         * foreachRDD
         */
        windowPairStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() 
            @Override
            public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRdd) throws Exception 

                // 转换成RDD[Row]
                JavaRDD<Row> countRowRdd = stringIntegerJavaPairRdd.map(new Function<Tuple2<String, Integer>, Row>() 
                    @Override
                    public Row call(Tuple2<String, Integer> tuple) throws Exception 

                        String category = tuple._1.split("_")[0];
                        String product = tuple._1.split("_")[1];
                        Integer productCount = tuple._2;
                        return RowFactory.create(category,product,productCount);
                    
                );

                List<StructField> structFields = new ArrayList<StructField>();
                structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));
                structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));
                structFields.add(DataTypes.createStructField("productCount",DataTypes.IntegerType,true));

                // 构造Schema
                StructType structType = DataTypes.createStructType(structFields);

                // 创建SparkSession
                SparkSession spark = CommSparkSession.getSparkSession();

                // 通过rdd和scheme创建DataFrame
                Dataset<Row> df = spark.createDataFrame(countRowRdd,structType);

                // 创建临时表
                df.createOrReplaceTempView("product_click");

                /**
                 * 统计每个种类下点击次数排名前3名的商品
                 */
                spark.sql("select category,product,productCount from product_click " +
                        "order by productCount desc limit 3").show();

                /**
                 * +--------+-------+------------+
                 * |category|product|productCount|
                 * +--------+-------+------------+
                 * | bigdata| hadoop|           4|
                 * | bigdata|  kafka|           2|
                 * | bigdata|  flink|           2|
                 * +--------+-------+------------+
                 */
            
        );

        jssc.start();
        jssc.awaitTermination();
    

Scala语言实现:

package com.kfk.spark.top_hot_product_project

import com.kfk.spark.common.CommSparkSessionScala, CommStreamingContextScala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
import org.apache.spark.streaming.Seconds

/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/19
 * @time : 3:59 下午
 */
object TopHotProductScala 
    def main(args: Array[String]): Unit = 
        val jssc = CommStreamingContextScala.getJssc

        /**
         * 输入数据源模型
         * --------------------------
         * 姓名		商品分类		商品名称
         * alex 	bigdata		hadoop
         * alex 	bigdata		spark
         * jack 	bigdata		flink
         * alex 	bigdata		hadoop
         * alex 	language	java
         * pony 	language	python
         * jone 	language	java
         * lili 	bigdata		hadoop
         * --------------------------
         */
        val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999)

        /**
         * mapToPair
         * <bigdata_hadoop,1>
         * <bigdata_spark,1>
         * <bigdata_flink,1>
         */
        val pairDStream = inputDstream.map(x => (x.split(" ")(1) + "_" + x.split(" ")(2),1))

        /**
         * reduceByKeyAndWindow
         * <bigdata_hadoop,5>
         * <bigdata_spark,2>
         * <bigdata_flink,4>
         */
        val windowDStream = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10))

        windowDStream.foreachRDD(x => 

            // 转换成RDD[Row]
            val rowRDD = x.map(tuple => 
                val category = tuple._1.split("_")(0)
                val product = tuple._1.split("_")(1)
                val productCount = tuple._2
                Row(category,product,productCount)
            )

            // 构造Schema
            val structType = StructType(Array(
                StructField("category",StringType,true),
                StructField("product",StringType,true),
                StructField("productCount",IntegerType,true)
            ))

            // 创建SparkSession
            val spark = CommSparkSessionScala.getSparkSession()

            // 通过rdd和scheme创建DataFrame
            val df = spark.createDataFrame(rowRDD,structType)

            // 创建临时表
            df.createOrReplaceTempView("product_click")

            /**
             * 统计每个种类下点击次数排名前3名的商品
             */
            spark.sql("select category,product,productCount from product_click "
                    + "order by productCount desc limit 3").show()

            /**
             * +--------+-------+------------+
             * |category|product|productCount|
             * +--------+-------+------------+
             * | bigdata| hadoop|           4|
             * | bigdata|  kafka|           2|
             * | bigdata|  flink|           2|
             * +--------+-------+------------+
             */
        )

        jssc.start()
        jssc.awaitTermination()
    


测试数据源:

alex bigdata hadoop
alex bigdata spark
jack bigdata flink
alex bigdata hadoop
alex language java
pony language python
jone language java
lili bigdata hadoop
lili bigdata hive
lili bigdata hbase
lili bigdata hadoop
lili bigdata spark
lili bigdata flink
lili bigdata kafka
lili bigdata kafka

运行结果:

+--------+-------+------------+
|category|product|productCount|
+--------+-------+------------+
| bigdata| hadoop|           4|
| bigdata|  kafka|           2|
| bigdata|  flink|           2|
+--------+-------+------------+

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

CSDN 社区图书馆,开张营业! 深读计划,写书评领图书福利~

以上是关于Spark Streaming与Spark SQL结合操作详解的主要内容,如果未能解决你的问题,请参考以下文章

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

spark streaming 和spark sql的区别

spark corespark sqlspark streaming 联系与区别

DataFlow编程模型与Spark Structured streaming

Spark的Streaming和Spark的SQL简单入门学习

Spark Streaming + Spark SQL 实现配置化ETL流程