2021年最新最全Flink系列教程__Flink高级API

Posted ChinaManor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年最新最全Flink系列教程__Flink高级API相关的知识,希望对你有一定的参考价值。

day03_Flink高级API

今日目标

  • Flink的四大基石
  • Flink窗口Window操作
  • Flink时间Time
  • Flink水印Watermark机制
  • Flink的state状态管理-keyed state 和 operator state

Flink的四大基石

  • Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
  • State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
  • Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
  • Window窗口,TimeWindow 、 countwindow、 sessionwindow

Window操作

Window分类

  • time
    • 用的比较多 滚动窗口和滑动窗口
  • count

如何使用

image-20210507090957187

案例

  • 需求
/**
 * Author itcast
 * Date 2021/5/7 9:13
 * 有如下数据表示:
 * 信号灯编号和通过该信号灯的车的数量
 * 9,3
 * 9,2
 * 9,7
 * 4,9
 * 2,6
 * 1,5
 * 2,3
 * 5,7
 * 5,4
 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
 */
  • 分析

    image-20210507092308268

  • 代码

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * Author itcast
     * Date 2021/5/7 9:13
     * 有如下数据表示:
     * 信号灯编号和通过该信号灯的车的数量
     * 9,3
     * 9,2
     * 9,7
     * 4,9
     * 2,6
     * 1,5
     * 2,3
     * 5,7
     * 5,4
     * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
     * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
     */
    public class WindowDemo {
        public static void main(String[] args) throws Exception {
            //1.创建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            //2.获取数据源
            DataStreamSource<String> source = env.socketTextStream("node1", 9999);
            //3.转换操作 基于key window 统计
            DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new CartInfo(split[0], Integer.parseInt(split[1]));
                }
            });
            //统计 滚动窗口
            DataStream<CartInfo> result = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    //使用的是处理时间
                    //每5秒钟统计一次,最近5秒钟内
                    .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                    .sum("count");
            //统计 滑动窗口
            DataStream<CartInfo> result1 = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    //使用的是处理时间
                    //每5秒钟统计一次,最近5秒钟内
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                    .sum("count");
    
            //4.打印输出
            result1.print();
            //5.执行流环境
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;//信号灯id
            private Integer count;//通过该信号灯的车的数量
        }
    }
    
  • 需求2 - countwindow

    需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口

    需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口

  • 代码

    /**
     * Author itcast
     * Date 2021/5/7 9:13
     * 有如下数据表示:
     * 信号灯编号和通过该信号灯的车的数量
     * 9,3
     * 9,2
     * 9,7
     * 4,9
     * 2,6
     * 1,5
     * 2,3
     * 5,7
     * 5,4
     需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
     需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
     */
    public class WindowDemo02 {
        public static void main(String[] args) throws Exception {
            //1.创建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            //2.获取数据源
            DataStreamSource<String> source = env.socketTextStream("node1", 9999);
            //3.转换操作 基于key window 统计
            DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new CartInfo(split[0], Integer.parseInt(split[1]));
                }
            });
            //统计 滚动计数窗口
            DataStream<CartInfo> result = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    //使用的是处理时间
                    //每5条数据统计一次
                    .countWindow(5)
                    .sum("count");
            //统计 滑动计数窗口
            DataStream<CartInfo> result1 = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    //使用的是处理时间
                    //每5秒钟统计一次,最近5秒钟内
                    .countWindow(10,5)
                    .sum("count");
    
            //4.打印输出
            //result.printToErr();
            result1.print();
            //5.执行流环境
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;//信号灯id
            private Integer count;//通过该信号灯的车的数量
        }
    }
    
  • 统计会话指定时间内的数据,如果这个窗口内没有数据,就不在计算,设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算。

  • 案例 - 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

image-20210507101009350

  • 代码

    /**
     * Author itcast
     * Date 2021/5/7 9:13
     * 有如下数据表示:
     * 信号灯编号和通过该信号灯的车的数量
     * 9,3
     * 9,2
     * 9,7
     * 4,9
     * 2,6
     * 1,5
     * 2,3
     * 5,7
     * 5,4
        设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
     */
    public class WindowDemo03 {
        public static void main(String[] args) throws Exception {
            //1.创建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            //2.获取数据源
            DataStreamSource<String> source = env.socketTextStream("node1", 9999);
            //3.转换操作 基于key window 统计
            DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new CartInfo(split[0], Integer.parseInt(split[1]));
                }
            });
            //统计 会话窗口
            DataStream<CartInfo> result = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    //使用的是处理时间
                    //每5条数据统计一次
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                    .sum("count");
    
            //4.打印输出
            //result.printToErr();
            result.print();
            //5.执行流环境
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;//信号灯id
            private Integer count;//通过该信号灯的车的数量
        }
    }
    

Time 时间

  • EventTime的重要性
    • 防止出现网络抖动,造成数据的乱序,数据统计的丢失
  • 窗口: 开始时间-结束时间

watermark 水印时间

  • watermark 水印机制

    • watermark 就是时间戳
    • watermark = eventTime - maxDelayTime
  • 触发计算 watermak >= 结束时间

watermark 案例

  • 需求

    有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

    要求每隔5s,计算5秒内,每个用户的订单总金额

    并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

    基础版:

    package cn.itcast.sz22.day03;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.time.Duration;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Author itcast
     * Date 2021/5/7 11:04
     * Desc TODO
     */
    public class WatermarkDemo01 {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //
            env.setParallelism(1);
            //2.Source
            //模拟实时订单数据(数据有延迟和乱序)
            DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
                private boolean flag = true;
    
                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    Random random = new Random();
                    while (flag) {
                        String orderId = UUID.randomUUID().toString();
                        int userId = random.nextInt(3);
                        int money = random.nextInt(100);
                        //模拟数据延迟和乱序!
                        long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                        ctx.collect(new Order(orderId, userId, money, eventTime));
                        TimeUnit.SECONDS.sleep(1);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
    
            // 添加水印机制 最大允许延迟的时间为 3 秒
            //orderDS.printToErr();
            //分配水印机制
            SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                    //指定最大的延迟时间
                    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp);
                    .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
                    //统计每个用户对应 购买 金额
                    .keyBy(t -> t.getUserId())
                    //指定窗口,每5秒钟统计5秒钟之内的数据
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .sum("money");
            sum.print();
    
            //
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order {
            private String orderId;
            private Integer userId;
            private Integer money;
            private Long eventTime;
        }
    }
    

    扩展版:

    package cn.itcast.sz22.day03;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.commons.lang3.time.FastDateFormat;
    import org.apache.flink.api.common.eventtime.*;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Author itcast
     * Date 2021/5/7 11:04
     * Desc TODO
     */
    public class WatermarkDemo02 {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //
            env.setParallelism(1);
            //2.Source
            //模拟实时订单数据(数据有延迟和乱序)
            DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
                private boolean flag = true;
    
                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    Random random = new Random();
                    while (flag) {
                        String orderId = UUID.randomUUID().toString();
                        int userId = random.nextInt(3);
                        int money = random.nextInt(100);
                        //模拟数据延迟和乱序!
                        long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
                        ctx.collect(new Order(orderId, userId, money, eventTime));
                        TimeUnit.SECONDS.sleep(1);
                    }
                }
    
                @Override
                public void cancel() {
                    flag = false;
                }
            });
    
            DataStream<Order> WatermarkDS = orderDS
                    .assignTimestampsAndWatermarks(
                            new WatermarkStrategy<Order>() {
                                @Override
                                public WatermarkGenerator<Order以上是关于2021年最新最全Flink系列教程__Flink高级API的主要内容,如果未能解决你的问题,请参考以下文章

    2021年最新最全Flink系列教程__Flink综合案例

    2021年最新最全Flink系列教程__Flink高级API

    2021年最新最全Flink系列教程_Flink流批一体API

    2021年最新最全Flink系列教程_Flink原理初探和流批一体API

    收藏+下载!Flink 社区 2021 最新最全学习渠道汇总

    收藏+下载!Flink 社区 2021 最新最全学习渠道汇总