2021年最新最全Flink系列教程__Flink高级API
Posted ChinaManor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年最新最全Flink系列教程__Flink高级API相关的知识,希望对你有一定的参考价值。
day03_Flink高级API
今日目标
- Flink的四大基石
- Flink窗口Window操作
- Flink时间Time
- Flink水印Watermark机制
- Flink的state状态管理-keyed state 和 operator state
Flink的四大基石
- Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
- State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
- Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
- Window窗口,TimeWindow 、 countwindow、 sessionwindow
Window操作
Window分类
- time
- 用的比较多 滚动窗口和滑动窗口
- count
如何使用
案例
- 需求
/**
* Author itcast
* Date 2021/5/7 9:13
* 有如下数据表示:
* 信号灯编号和通过该信号灯的车的数量
* 9,3
* 9,2
* 9,7
* 4,9
* 2,6
* 1,5
* 2,3
* 5,7
* 5,4
* 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
* 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
*/
-
分析
-
代码
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */ public class WindowDemo { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) .sum("count"); //统计 滑动窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .sum("count"); //4.打印输出 result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
-
需求2 - countwindow
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口
-
代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 */ public class WindowDemo02 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动计数窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .countWindow(5) .sum("count"); //统计 滑动计数窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .countWindow(10,5) .sum("count"); //4.打印输出 //result.printToErr(); result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
-
统计会话指定时间内的数据,如果这个窗口内没有数据,就不在计算,设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算。
-
案例 - 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
-
代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 */ public class WindowDemo03 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 会话窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .sum("count"); //4.打印输出 //result.printToErr(); result.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
Time 时间
- EventTime的重要性
- 防止出现网络抖动,造成数据的乱序,数据统计的丢失
- 窗口: 开始时间-结束时间
watermark 水印时间
-
watermark 水印机制
- watermark 就是时间戳
- watermark = eventTime - maxDelayTime
-
触发计算 watermak >= 结束时间
watermark 案例
-
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
基础版:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); // 添加水印机制 最大允许延迟的时间为 3 秒 //orderDS.printToErr(); //分配水印机制 SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy //指定最大的延迟时间 .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp); .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())) //统计每个用户对应 购买 金额 .keyBy(t -> t.getUserId()) //指定窗口,每5秒钟统计5秒钟之内的数据 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); sum.print(); // env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
扩展版:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); DataStream<Order> WatermarkDS = orderDS .assignTimestampsAndWatermarks( new WatermarkStrategy<Order>() { @Override public WatermarkGenerator<Order以上是关于2021年最新最全Flink系列教程__Flink高级API的主要内容,如果未能解决你的问题,请参考以下文章
2021年最新最全Flink系列教程__Flink高级API
2021年最新最全Flink系列教程_Flink流批一体API
2021年最新最全Flink系列教程_Flink原理初探和流批一体API