Apache Spark:SparkStream创建Receiver来实现模拟无边界流操作
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Spark:SparkStream创建Receiver来实现模拟无边界流操作相关的知识,希望对你有一定的参考价值。
当前版本:spark 2.4.6
1. 声明
当前内容主要为学习和使用SparkStream来实现流的操作,主要为使用自定的Receiver来模拟无限流的处理,当前内容参考官方文档
- 由于
socket编写复杂
,且file读取需要hadoop的文件
,本着简单的目的所以找到了自定义的接受流 queueStream队列方式的,只能使用一次,且不可以使用线程方式一致加数据,queue数据修改并不会让流继续计算!!!
Receiver流,主要通过onStart来启动,且存储数据使用store方式存放数据到RDD中,onStop表示流停止
2. 基本demo
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("spark stream test").setMaster("local[*]");
// Durations.seconds(1)设置时间为1秒每次的执行
// new Duration(5000);表示5秒间隔
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaSparkContext sparkContext = jsc.sparkContext();
sparkContext.setLogLevel("ERROR");
SparkSession sparkSession = new SparkSession(sparkContext.sc());
// operation
receiverStream(jsc);
jsc.start();
try {
jsc.awaitTermination();
sparkSession.close();
jsc.close();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static void receiverStream(JavaStreamingContext jsc) {
Receiver<String> receiver = new Receiver<String>(StorageLevel.MEMORY_ONLY()) {
@Override
public void onStart() {
// TODO Auto-generated method stub
System.out.println("start....");
String[] names= {"admin","guest","user"};
while(true) {
int len = (int)(Math.random()*10+1);
List<String> dataList=new ArrayList<String>(len);
for (int i = 0; i < len; i++) {
int index = (int)(Math.random()*names.length);
String name = names[index];
dataList.add(name);
}
//
System.out.println("生产数据:"+len);
store(dataList.iterator());
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
public void onStop() {
System.out.println("shutdown....");
}
};
// 这里就可以一直输出数据了实现了无限流,其他操作参看JavaRDD的操作
jsc.receiverStream(receiver).countByValue().print();
}
接受流通过定义3秒钟产生一次数据并通过store方式存放数据到策略中(这里指内存),然后当前流统计print是按照每5秒输出一次
3.执行结果
测试成功!
以上是关于Apache Spark:SparkStream创建Receiver来实现模拟无边界流操作的主要内容,如果未能解决你的问题,请参考以下文章
MemSQL 取代 HDFS 与 Spark 结合,性能大幅提升