sparkstreaming结合sparksql读取socket实时数据流
Posted wx5d37d5fd4aa62
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparkstreaming结合sparksql读取socket实时数据流相关的知识,希望对你有一定的参考价值。
Spark Streaming是构建在Spark Core的RDD基础之上的,与此同时Spark Streaming引入了一个新的概念:DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。DStream抽象是Spark Streaming的流处理模型,在内部实现上,Spark Streaming会对输入数据按照时间间隔(如1秒)分段,每一段数据转换为Spark中的RDD,这些分段就是Dstream,并且对DStream的操作都最终转变为对相应的RDD的操作。
Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。Spark SQL 的前身是Shark,Shark是基于 Hive 所开发的工具,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在 Spark 引擎上。
(1)pom依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_$scala.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_$scala.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_$scala.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.66</version>
</dependency>
</dependencies>
(2)定义消息对象
package com.pojo;
import java.io.Serializable;
import java.util.Date;
/**
* Created by lj on 2022-07-13.
*/
public class WaterSensor implements Serializable
public String id;
public long ts;
public int vc;
public WaterSensor()
public WaterSensor(String id,long ts,int vc)
this.id = id;
this.ts = ts;
this.vc = vc;
public int getVc()
return vc;
public void setVc(int vc)
this.vc = vc;
public String getId()
return id;
public void setId(String id)
this.id = id;
public long getTs()
return ts;
public void setTs(long ts)
this.ts = ts;
(3)构建数据生产者
package com.producers;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;
/**
* Created by lj on 2022-07-12.
*/
public class Socket_Producer
public static void main(String[] args) throws IOException
try
ServerSocket ss = new ServerSocket(9999);
System.out.println("启动 server ....");
Socket s = ss.accept();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
String response = "java,1,2";
//每 2s 发送一次消息
int i = 0;
Random r=new Random(); //不传入种子
String[] lang = "flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi";
while(true)
response= lang[r.nextInt(lang.length)]+ i + "," + i + "," + i+"\\n";
System.out.println(response);
try
bw.write(response);
bw.flush();
i++;
catch (Exception ex)
System.out.println(ex.getMessage());
Thread.sleep(1000 * 30);
catch (IOException | InterruptedException e)
e.printStackTrace();
(4)通过sparkstreaming接入socket数据源,sparksql计算结果打印输出:
package com.examples;
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Created by lj on 2022-07-16.
*/
public class SparkSql_Socket1
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args)
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//获得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
//从socket源获取数据
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
//将 DStream 转换成 DataFrame 并且运行sql查询
lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>()
@Override
public void call(JavaRDD<String> rdd, Time time)
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
//通过反射将RDD转换为DataFrame
JavaRDD<WaterSensor> rowRDD = rdd.map(new Function<String, WaterSensor>()
@Override
public WaterSensor call(String line)
String[] cols = line.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0],Long.parseLong(cols[1]),Integer.parseInt(cols[2]));
return waterSensor;
);
Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, WaterSensor.class);
// 创建临时表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//输出前20条数据
result.show();
);
//开始作业
ssc.start();
try
ssc.awaitTermination();
catch (Exception e)
e.printStackTrace();
finally
ssc.close();
(5)效果演示:
代码中定义的是1分钟的批处理间隔,所以每1分钟会触发一次计算:
以上是关于sparkstreaming结合sparksql读取socket实时数据流的主要内容,如果未能解决你的问题,请参考以下文章
sparkstreaming结合sparksql读取socket实时数据流