1-flink理论-批处理与流处理+简单示例

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1-flink理论-批处理与流处理+简单示例相关的知识,希望对你有一定的参考价值。

【README】

1.本文包含了 批处理与流处理的代码示例;

  • 批处理:把数据 攒在一起(或攒一段时间或攒一定内存大小),然后再处理,这叫批处理;
  • 流处理:数据每来一个就处理一个;

2.特点:

数据处理方式特点
批处理1.高延时;
流处理1.低延时;

3.引入flink的maven依赖:

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
    </dependencies>

【1】flink批处理离线数据(数据有限)

【1.1】代码

1)数据源,我们保存在本地文本文件中,命名为  hello.txt

hello world
hello flink
how are you
thank you
hello zhangsan
hello lisi

2)批处理代码:

/**
 * @Description 批处理,word count程序(离线数据)
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月09日
 */
public class WordCount 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 从文件中读取数据
        String inputPath = "D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\hello.txt";
        DataSource<String> dataSource = env.readTextFile(inputPath);
        // 对数据集处理,按照空格分词展开,转为 (word,1) 二元组统计
        DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new MyFlatMapper())
                .groupBy(0) // 按照第1个位置的word分组
                .sum(1); // 将第2个位置上的数据求和
        resultSet.print();
    

    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> 
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception 
            // 按照空格分词
            String[] words = value.split(" ");
            // 遍历所有word,包装成word 输出
            Arrays.stream(words).forEach(x->
                collector.collect(new Tuple2<>(x, 1));
            );
        
    

批处理打印结果:

(you,2)
(flink,1)
(world,1)
(hello,4)
(lisi,1)
(zhangsan,1)
(are,1)
(thank,1)
(how,1)

批处理的结果是最终结果


【2】flink流处理离线数据(数据有限)

/**
 * @Description 流数据(无限数据)
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月09日
 */
public class StreamWordCount 
    public static void main(String[] args) throws Exception 
        // 流处理执行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(2); // 设置并行度
        // 从文件中读取数据
        String inputPath = "D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\hello.txt";
        DataStream<String> dataStream = streamEnv.readTextFile(inputPath);
        // 定义流操作
        DataStream<Tuple2<String, Integer>> resultStream = dataStream.flatMap(new WordCount.MyFlatMapper())
                .keyBy(0)
                .sum(1);
        // 打印结果
        resultStream.print();
        // 执行任务(流终止操作)
        streamEnv.execute();
    

打印结果:

2> (world,1)
1> (thank,1)
2> (flink,1)
1> (hello,1)
2> (how,1)
2> (you,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (zhangsan,1)
1> (hello,4)
2> (lisi,1)
1> (are,1)

流处理的结果是一个动态变化的有状态的结果;

有状态的意思说白了就是:后面的处理结果依赖前面的处理结果,如对hello计数为3,它是在前面hello计数为2的基础上做的处理;


【3】flink流处理在线数据(数据无限)

我们引入了 netcat(nc),底层使用socket模拟向某端口写入数据;

然后 flink监控该端口的数据,并做处理;

【3.1】 flink处理类

处理类监听了 nc所在机器的的端口,即 192.168.163.201:7777;

/**
 * @Description socket文本流词计数
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月09日
 */
public class SocketTextStreamWordCount 
    public static void main(String[] args) throws Exception 
        // 流处理执行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(2); // 设置并行度
        // 从 flinkjava parametertool 获取参数(或有)
//        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        String host = parameterTool.get("host");
//        int port = parameterTool.getInt("port");
        // 从socket文本流读取数据
        DataStream<String> inputDataStream = streamEnv.socketTextStream("192.168.163.201", 7777);
        // 定义流操作
        DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
                .keyBy(0)
                .sum(1);
        // 打印结果
        resultStream.print();
        // 执行任务(流终止操作)
        streamEnv.execute();
    

演示效果:

 

以上是关于1-flink理论-批处理与流处理+简单示例的主要内容,如果未能解决你的问题,请参考以下文章

入门大数据---Spark_Streaming与流处理

FLink的窗口机制与流处理Join的方案

[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子

[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子

Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

JAVA基础篇—文件与流