1-flink理论-批处理与流处理+简单示例
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1-flink理论-批处理与流处理+简单示例相关的知识,希望对你有一定的参考价值。
【README】
1.本文包含了 批处理与流处理的代码示例;
- 批处理:把数据 攒在一起(或攒一段时间或攒一定内存大小),然后再处理,这叫批处理;
- 流处理:数据每来一个就处理一个;
2.特点:
数据处理方式 | 特点 |
批处理 | 1.高延时; |
流处理 | 1.低延时; |
3.引入flink的maven依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.4</version>
</dependency>
</dependencies>
【1】flink批处理离线数据(数据有限)
【1.1】代码
1)数据源,我们保存在本地文本文件中,命名为 hello.txt
hello world
hello flink
how are you
thank you
hello zhangsan
hello lisi
2)批处理代码:
/**
* @Description 批处理,word count程序(离线数据)
* @author xiao tang
* @version 1.0.0
* @createTime 2022年04月09日
*/
public class WordCount
public static void main(String[] args) throws Exception
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
String inputPath = "D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\hello.txt";
DataSource<String> dataSource = env.readTextFile(inputPath);
// 对数据集处理,按照空格分词展开,转为 (word,1) 二元组统计
DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new MyFlatMapper())
.groupBy(0) // 按照第1个位置的word分组
.sum(1); // 将第2个位置上的数据求和
resultSet.print();
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>>
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception
// 按照空格分词
String[] words = value.split(" ");
// 遍历所有word,包装成word 输出
Arrays.stream(words).forEach(x->
collector.collect(new Tuple2<>(x, 1));
);
批处理打印结果:
(you,2)
(flink,1)
(world,1)
(hello,4)
(lisi,1)
(zhangsan,1)
(are,1)
(thank,1)
(how,1)
批处理的结果是最终结果;
【2】flink流处理离线数据(数据有限)
/**
* @Description 流数据(无限数据)
* @author xiao tang
* @version 1.0.0
* @createTime 2022年04月09日
*/
public class StreamWordCount
public static void main(String[] args) throws Exception
// 流处理执行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(2); // 设置并行度
// 从文件中读取数据
String inputPath = "D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\hello.txt";
DataStream<String> dataStream = streamEnv.readTextFile(inputPath);
// 定义流操作
DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(0)
.sum(1);
// 打印结果
resultStream.print();
// 执行任务(流终止操作)
streamEnv.execute();
打印结果:
2> (world,1)
1> (thank,1)
2> (flink,1)
1> (hello,1)
2> (how,1)
2> (you,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (zhangsan,1)
1> (hello,4)
2> (lisi,1)
1> (are,1)
流处理的结果是一个动态变化的有状态的结果;
有状态的意思说白了就是:后面的处理结果依赖前面的处理结果,如对hello计数为3,它是在前面hello计数为2的基础上做的处理;
【3】flink流处理在线数据(数据无限)
我们引入了 netcat(nc),底层使用socket模拟向某端口写入数据;
然后 flink监控该端口的数据,并做处理;
【3.1】 flink处理类
处理类监听了 nc所在机器的的端口,即 192.168.163.201:7777;
/**
* @Description socket文本流词计数
* @author xiao tang
* @version 1.0.0
* @createTime 2022年04月09日
*/
public class SocketTextStreamWordCount
public static void main(String[] args) throws Exception
// 流处理执行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(2); // 设置并行度
// 从 flinkjava parametertool 获取参数(或有)
// ParameterTool parameterTool = ParameterTool.fromArgs(args);
// String host = parameterTool.get("host");
// int port = parameterTool.getInt("port");
// 从socket文本流读取数据
DataStream<String> inputDataStream = streamEnv.socketTextStream("192.168.163.201", 7777);
// 定义流操作
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(0)
.sum(1);
// 打印结果
resultStream.print();
// 执行任务(流终止操作)
streamEnv.execute();
演示效果:
以上是关于1-flink理论-批处理与流处理+简单示例的主要内容,如果未能解决你的问题,请参考以下文章
[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子
[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子