Spark Streaming与Spark SQL结合操作详解
Posted <一蓑烟雨任平生>
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming与Spark SQL结合操作详解相关的知识,希望对你有一定的参考价值。
Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通 过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操 作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。
案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类 top3热门的商品。
数据输入模型:
--------------------------
姓名 商品分类 商品名称
alex bigdata hadoop
alex bigdata spark
jack bigdata flink
alex bigdata hadoop
alex language java
pony language python
jone language java
lili bigdata hadoop
--------------------------
第一步:mapToPair
第一步:mapToPair
<bigdata_hadoop,1>
<bigdata_spark,1>
<bigdata_flink,1>
...
第二步:reduceByKeyAndWindow
<bigdata_hadoop,5>
<bigdata_spark,2>
<bigdata_flink,4>
...
第三步:foreachRDD
foreachRDD -> DataFrame -> registerTempView -> sql
Java语言实现:
package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/14
* @time : 8:23 下午
*/
public class CommStreamingContext
public static JavaStreamingContext getJssc()
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
return new JavaStreamingContext(conf, Durations.seconds(2));
package com.kfk.spark.top_hot_product_project;
import com.kfk.spark.common.CommSparkSession;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/19
* @time : 1:36 下午
*/
public class TopHotProduct
public static void main(String[] args) throws InterruptedException
JavaStreamingContext jssc = CommStreamingContext.getJssc();
/**
* 输入数据源模型
* --------------------------
* 姓名 商品分类 商品名称
* alex bigdata hadoop
* alex bigdata spark
* jack bigdata flink
* alex bigdata hadoop
* alex language java
* pony language python
* jone language java
* lili bigdata hadoop
* --------------------------
*/
JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999);
/**
* mapToPair
* <bigdata_hadoop,1>
* <bigdata_spark,1>
* <bigdata_flink,1>
*/
JavaPairDStream<String, Integer> pairDstream = inputDstream.mapToPair(new PairFunction<String, String, Integer>()
@Override
public Tuple2<String, Integer> call(String lines) throws Exception
String[] line = lines.split(" ");
return new Tuple2<>(line[1] + "_" + line[2], 1);
);
/**
* reduceByKeyAndWindow
* <bigdata_hadoop,5>
* <bigdata_spark,2>
* <bigdata_flink,4>
*/
JavaPairDStream<String, Integer> windowPairStream = pairDstream.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer>()
@Override
public Integer call(Integer v1, Integer v2) throws Exception
return v1+v2;
, Durations.seconds(60),Durations.seconds(10));
/**
* foreachRDD
*/
windowPairStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>()
@Override
public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRdd) throws Exception
// 转换成RDD[Row]
JavaRDD<Row> countRowRdd = stringIntegerJavaPairRdd.map(new Function<Tuple2<String, Integer>, Row>()
@Override
public Row call(Tuple2<String, Integer> tuple) throws Exception
String category = tuple._1.split("_")[0];
String product = tuple._1.split("_")[1];
Integer productCount = tuple._2;
return RowFactory.create(category,product,productCount);
);
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("productCount",DataTypes.IntegerType,true));
// 构造Schema
StructType structType = DataTypes.createStructType(structFields);
// 创建SparkSession
SparkSession spark = CommSparkSession.getSparkSession();
// 通过rdd和scheme创建DataFrame
Dataset<Row> df = spark.createDataFrame(countRowRdd,structType);
// 创建临时表
df.createOrReplaceTempView("product_click");
/**
* 统计每个种类下点击次数排名前3名的商品
*/
spark.sql("select category,product,productCount from product_click " +
"order by productCount desc limit 3").show();
/**
* +--------+-------+------------+
* |category|product|productCount|
* +--------+-------+------------+
* | bigdata| hadoop| 4|
* | bigdata| kafka| 2|
* | bigdata| flink| 2|
* +--------+-------+------------+
*/
);
jssc.start();
jssc.awaitTermination();
Scala语言实现:
package com.kfk.spark.top_hot_product_project
import com.kfk.spark.common.CommSparkSessionScala, CommStreamingContextScala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
import org.apache.spark.streaming.Seconds
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/12/19
* @time : 3:59 下午
*/
object TopHotProductScala
def main(args: Array[String]): Unit =
val jssc = CommStreamingContextScala.getJssc
/**
* 输入数据源模型
* --------------------------
* 姓名 商品分类 商品名称
* alex bigdata hadoop
* alex bigdata spark
* jack bigdata flink
* alex bigdata hadoop
* alex language java
* pony language python
* jone language java
* lili bigdata hadoop
* --------------------------
*/
val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999)
/**
* mapToPair
* <bigdata_hadoop,1>
* <bigdata_spark,1>
* <bigdata_flink,1>
*/
val pairDStream = inputDstream.map(x => (x.split(" ")(1) + "_" + x.split(" ")(2),1))
/**
* reduceByKeyAndWindow
* <bigdata_hadoop,5>
* <bigdata_spark,2>
* <bigdata_flink,4>
*/
val windowDStream = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10))
windowDStream.foreachRDD(x =>
// 转换成RDD[Row]
val rowRDD = x.map(tuple =>
val category = tuple._1.split("_")(0)
val product = tuple._1.split("_")(1)
val productCount = tuple._2
Row(category,product,productCount)
)
// 构造Schema
val structType = StructType(Array(
StructField("category",StringType,true),
StructField("product",StringType,true),
StructField("productCount",IntegerType,true)
))
// 创建SparkSession
val spark = CommSparkSessionScala.getSparkSession()
// 通过rdd和scheme创建DataFrame
val df = spark.createDataFrame(rowRDD,structType)
// 创建临时表
df.createOrReplaceTempView("product_click")
/**
* 统计每个种类下点击次数排名前3名的商品
*/
spark.sql("select category,product,productCount from product_click "
+ "order by productCount desc limit 3").show()
/**
* +--------+-------+------------+
* |category|product|productCount|
* +--------+-------+------------+
* | bigdata| hadoop| 4|
* | bigdata| kafka| 2|
* | bigdata| flink| 2|
* +--------+-------+------------+
*/
)
jssc.start()
jssc.awaitTermination()
测试数据源:
alex bigdata hadoop
alex bigdata spark
jack bigdata flink
alex bigdata hadoop
alex language java
pony language python
jone language java
lili bigdata hadoop
lili bigdata hive
lili bigdata hbase
lili bigdata hadoop
lili bigdata spark
lili bigdata flink
lili bigdata kafka
lili bigdata kafka
运行结果:
+--------+-------+------------+
|category|product|productCount|
+--------+-------+------------+
| bigdata| hadoop| 4|
| bigdata| kafka| 2|
| bigdata| flink| 2|
+--------+-------+------------+
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
以上是关于Spark Streaming与Spark SQL结合操作详解的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池
spark corespark sqlspark streaming 联系与区别
DataFlow编程模型与Spark Structured streaming