2021年最新最全Flink系列教程_Flink原理初探和流批一体API

Posted ChinaManor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年最新最全Flink系列教程_Flink原理初探和流批一体API相关的知识,希望对你有一定的参考价值。

day02_流批一体API

今日目标

  • 流处理概念(理解)
  • 程序结构之数据源Source(掌握)
  • 程序结构之数据转换Transformation(掌握)
  • 程序结构之数据落地Sink(掌握)
  • Flink连接器Connectors(理解)

流处理概念

数据的时效性

  • 强调的是数据的处理时效

    网站的数据访问,被爬虫爬取

流处理和批处理

  • 流处理是无界的

    • 窗口操作来划分数据的边界进行计算
  • 批处理是有界的

  • 在Flink1.12时支持流批一体 既支持流处理也支持批处理。

    image-20210505091439155

编程模型

image-20210505091654525

  • source
  • transformation
  • sink

Source

基于File

  • 需求env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
package cn.itcast.sz22.day02;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Date 2021/5/5 9:50
 * env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
 */
public class FileSourceDemo {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //读取文件 hdfs://node1:8020/user/root/xxx.txt
        //读取通过 gzip 压缩的 gz 文件
        DataStreamSource<String> source1 = env.readTextFile("data/hello.txt");
        DataStreamSource<String> source2 = env.readTextFile("D:\\\\_java_workspace\\\\sz22\\\\data\\\\hello.txt.gz");
        //打印文本
        source1.print();
        source2.print("source2:");
        //执行流环境
        env.execute();
    }
}

基于数据集合 fromElemet

  • 需求

    1.env.fromElements(可变参数);

    2.env.fromColletion(各种集合);

    3.env.generateSequence(开始,结束);

    4.env.fromSequence(开始,结束);

  • 案例

    package cn.itcast.sz22.day02;
    
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.ArrayList;
    
    /**
     * Author itcast
     * Date 2021/5/5 9:20
     * 1. 创建环境(流处理环境)
     * 2. 获取数据源
     * 3. 打印数据
     * 4. 执行
     */
    public class SourceDemo01 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //1.env.fromElements(可变参数);
            DataStreamSource<String> source1 = env.fromElements("hello world", "hello flink");
            //2.env.fromColletion(各种集合);
            ArrayList list = new ArrayList();
            list.add("hello hadoop");
            list.add("hello flink");
            DataStreamSource source2 = env.fromCollection(list);
            //3.env.generateSequence(开始,结束);
            DataStreamSource<Long> source3 = env.generateSequence(1, 10).setParallelism(1);
            //4.env.fromSequence(开始,结束);
            DataStreamSource<Long> source4 = env.fromSequence(10, 20);
            //打印输出
            source1.print("source1");
            source2.print("source2");
            source3.print("source3");
            source4.print("source4");
            //执行环境
            env.execute();
    
        }
    }
    

custom

  • 几种 SourceFunction

    SourceFunction:非并行数据源(并行度只能=1)

    RichSourceFunction:多功能非并行数据源(并行度只能=1)

    ParallelSourceFunction:并行数据源(并行度能够>=1)

    RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)–后续学习的Kafka数据源使用的

  • 需求:每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

    package cn.itcast.sz22.day02;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.Random;
    import java.util.UUID;
    
    /**
     * Author itcast
     * Date 2021/5/5 10:15
     * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
     * 要求:
     * - 随机生成订单ID(UUID)
     * - 随机生成用户ID(0-2)
     * - 随机生成订单金额(0-100)
     * - 时间戳为当前系统时间
     */
    public class CustomSource {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //自定义 source
    
            Random rn = new Random();
            DataStreamSource<Order> source = env.addSource(new ParallelSourceFunction<Order>() {
                boolean flag = true;
    
                //创建一个 标记
                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    while (flag) {
                        //随机生成订单ID(UUID)
                        String oid = UUID.randomUUID().toString();
                        //随机生成用户ID(0-2)
                        int uid = rn.nextInt(3);
                        //随机生成订单金额(0-100)
                        int money = rn.nextInt(101);
                        //时间戳为当前系统时间
                        long timestamp = System.currentTimeMillis();
                        //将数据封装成 Order 收集数据
                        ctx.collect(new Order(oid, uid, money, timestamp));
                        //每一秒休息一次
                        Thread.sleep(1000);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            }).setParallelism(1);
            //打印输出
            source.print();
            env.execute();
        }
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order{
            private String uuid;
            private int uid;
            private int money;
            private Long timestamp;
        }
    }
    
  • mysql 中自定义数据源读取数据

    • 初始化
    CREATE DATABASE if not exists bigdata;
    USE bigdata;
    CREATE TABLE if not exists `t_student` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
        `name` varchar(255) DEFAULT NULL,
        `age` int(11) DEFAULT NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
    
    INSERT INTO `t_student` VALUES ('1', 'jack', '18');
    INSERT INTO `t_student` VALUES ('2', '张三', '19');
    INSERT INTO `t_student` VALUES ('3', 'rose', '20');
    INSERT INTO `t_student` VALUES ('4', 'tom', '19');
    INSERT INTO `t_student` VALUES ('5', '李四', '18');
    INSERT INTO `t_student` VALUES ('6', '王五', '20');
    
package cn.itcast.sz22.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * Author itcast
 * Date 2021/5/5 10:32
 * Desc TODO
 */
public class MySQLSourceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.env 设置并行度为 1
        env.setParallelism(1);
        //2.source,创建连接MySQL数据源 数据源,每2秒钟生成一条数据
        DataStreamSource<Student> source = env.addSource(new RichSourceFunction<Student>() {
            Connection conn;
            PreparedStatement ps;
            boolean flag = true;

            @Override
            public void open(Configuration parameters) throws Exception {
                //连接数据源
                conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false"
                        , "root", "123456");
                //编写读取数据表的sql
                String sql = "select `id`,`name`,age from t_student";
                //准备 preparestatement SQL
                ps = conn.prepareStatement(sql);
            }

            @Override
            public void run(SourceContext<Student> ctx) throws Exception {
                while (flag) {
                    ResultSet rs = ps.executeQuery();
                    while (rs.next()) {
                        int id = rs.getInt("id");
                        String name = rs.getString("name");
                        int age = rs.getInt("age");
                        Student student = new Student(id, name, age);
                        ctx.collect(student);
                    }
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }

            @Override
            public void close() throws Exception {
                ps.close();
                conn.close();
            }
        });
        //3.打印数据源
        //4.执行
        //创建静态内部类 Student ,字段为 id name age
        //创建静态内部类 MySQLSource 继承RichParallelSourceFunction<Student>
        // 实现 open 方法
        // 获取数据库连接 mysql5.7版本
         jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false
        // 实现 run 方法
        source.print();
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Student {
        private int id;
        private String name;
        private int age;
    }
}

socket 套接字

  • 安装 netcat yum install nc -y

  • 需求:通过 socket 接收数据并做单词wordcount 统计

    package cn.itcast.sz22.day02;
    
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.Arrays;
    
    /**
     * Author itcast
     * Date 2021/5/5 9:59
     * Desc TODO
     */
    public class SocketSourceDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.source socketSource
            DataStreamSource<String> source = env.socketTextStream("192.168.88.100", 9999);
            //3.处理数据-transformation
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = source
                    .flatMap((String value, Collector<String> out) -> Arrays
                            .stream(value.split(" ")).forEach(out::collect))
                    .returns(Types.STRING)
                    .map(value -> Tuple2.of(value, 1))
                    .returns(Types.TUPLE(Types.STRING, Types.INT))
                    .keyBy(t -> t.f0)
                    .sum(1);
            //3.1每一行数据按照空格切分成一个个的单词组成一个集合
            //3.2对集合中的每个单词记为1
            //3.3对数据按照单词(key)进行分组
            //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
            //4.输出结果-sink
            result.print();
            //5.触发执行-execute
            env.execute();
        }
    }
    

Transformation

转换操作的数据操作类型

  • 对于单条数据的处理 map filter
  • 对于多条数据,window窗口内的数据处理 reduce
  • 合流 union join 将多个流合并到一起
  • 分流 将一个数据流分成多个数据流 spit或 outputTag

案例

  • 对流数据中的单词进行统计,排除敏感词heihei
package cn.itcast.sz22.day02;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Author itcast
 * Date 2021/5/5 9:59
 * 1.filter过滤 将单词中  heihei 单词过滤掉
 * 2.reduce聚合
 */
public class SocketSourceFilterDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source socketSource
        DataStreamSource<String> source = env.socketTextStream("192.168.88.100", 9998);
        //3.处理数据-transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source
                .flatMap((String value, Collector<String> out) -> Arrays
                        .stream(value.split(" ")).forEach(out::collect))
                .returns(Types.STRING)
                //过滤掉 包含 heihei 单词的所有信息 boolean filter(T value)
                .filter(word-> !word.equals("heihei"))
                .map(value -> Tuple2.of(value, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t -> t.f0)
                //.sum(1);
                //T reduce(T value1, T value2)
                // hadoop,1 hadoop,1 => hadoop,1+1
        .reduce((Tuple2<String,Integer> a,Tuple2<String,Integer> b)->Tuple2.of(a.f0,a.f1+b.f1));
        //3.1每一行数据按照空格切分成一个个的单词组成一个集合
        //3.2对集合中的每个单词记为1
        //3.3对数据按照单词(key)进行分组
        //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
        //4.输出结果-sink
        result.print();
        //5.触发执行-execute
        env.execute()2021年最新最全Flink系列教程__Flink综合案例

2021年最新最全Flink系列教程__Flink高级API

2021年最新最全Flink系列教程_Flink流批一体API

2021年最新最全Flink系列教程_Flink原理初探和流批一体API

收藏+下载!Flink 社区 2021 最新最全学习渠道汇总

收藏+下载!Flink 社区 2021 最新最全学习渠道汇总