大数据(9f)Flink状态编程
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9f)Flink状态编程相关的知识,希望对你有一定的参考价值。
文章目录
概述
流式计算 分为 无状态计算 和 有状态计算
流处理的状态功能:去重、监控……
状态分类 | Managed State | Raw State |
---|---|---|
状态管理方式 | Flink Runtime托管,自动存储,自动恢复,自动伸缩 | 用户自己管理 |
状态数据结构 | Flink提供多种数据结构,例如:ListState 、MapState 等 | 字节数组:byte[] |
使用场景 | 多数Flink算子 | 所有算子 |
Managed State
RawState是在已有算子和ManagedState不够用时才使用
一般来说,ManagedState已经够用,下面重点学习
Managed State 分类 | Operator State | Keyed State |
---|---|---|
译名 | 算子状态 | 键控状态 |
状态分配 | 1个算子的子任务对应1个State | 1个算子处理多个Key,1个Key对应1个State |
出场率 | 较低 | 较高 |
本文开发环境是WIN10+IDEA;Flink版本是1.14
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.3</slf4j.version>
<log4j.version>2.17.2</log4j.version>
<fastjson.version>2.0.19</fastjson.version>
<lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>$log4j.version</version>
</dependency>
</dependencies>
Operator State
- 算子状态可用在所有算子上,每个算子子任务(SubTask)共享一个状态
算子子任务之间的状态不能互相访问 - 下面以列表状态和广播状态为例
ListState
列表状态 可与 检查点 合用,来 定期保存和清空状态
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class Hello
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//每3秒1次Checkpointing
env.enableCheckpointing(3000L);
//创建数据源,每秒1个数据
DataStreamSource<Integer> dss = env.addSource(new MySource());
//测试状态和检查点
dss.map(new MyMapFunction()).print();
//流环境执行
env.execute();
private static class MyMapFunction implements MapFunction<Integer, String>, CheckpointedFunction
private ListState<Integer> state;
@Override
public String map(Integer value) throws Exception
state.add(value);
return state.get().toString();
@Override
public void snapshotState(FunctionSnapshotContext context)
System.out.println("Checkpoint时调用snapshotState,清空状态");
state.clear();
@Override
public void initializeState(FunctionInitializationContext context) throws Exception
System.out.println("创建状态");
state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("", Integer.class));
public static class MySource implements SourceFunction<Integer>
public MySource()
@Override
public void run(SourceContext<Integer> sc) throws InterruptedException
for (int i = 0; i < 99; i++)
sc.collect(i);
Thread.sleep(1000L);
@Override
public void cancel()
测试结果
创建状态
[0]
[0, 1]
Checkpoint时调用snapshotState,清空状态
[2]
[2, 3]
[2, 3, 4]
Checkpoint时调用snapshotState,清空状态
[5]
[5, 6]
[5, 6, 7]
Checkpoint时调用snapshotState,清空状态
[8]
……
BroadcastState
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Scanner;
public class Hello
final static String STATE_KEY = "";
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
//1、主数据流
DataStreamSource<Integer> mainStream = env.addSource(new AutomatedSource());
//1、控制主数据的辅助流
DataStreamSource<String> branchStream = env.addSource(new ManualSource());
//2、创建状态描述符
MapStateDescriptor<String, Long> stateDescriptor = new MapStateDescriptor<>("", String.class, Long.class);
//2、创建广播流
BroadcastStream<String> broadcastStream = branchStream.broadcast(stateDescriptor);
//3、主数据流 连接 广播流
BroadcastConnectedStream<Integer, String> b = mainStream.connect(broadcastStream);
//BroadcastProcessFunction<IN1, IN2, OUT>
b.process(new BroadcastProcessFunction<Integer, String, String>()
//processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out)
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception
//4、获取广播状态
BroadcastState<String, Long> state = ctx.getBroadcastState(stateDescriptor);
//4、修改广播状态
state.put(STATE_KEY, Long.valueOf(value));
//processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out)
@Override
public void processElement(Integer value, ReadOnlyContext ctx, Collector<String> out) throws Exception
//5、获取只读广播状态
ReadOnlyBroadcastState<String, Long> state = ctx.getBroadcastState(stateDescriptor);
//5、从广播状态中取值
Long stateValue = state.get(STATE_KEY);
//6、输出
if (stateValue != null)
out.collect("有请" + value + "号佳丽进入" + stateValue + "号舞台");
).print();
//流环境执行
env.execute();
/** 手动输入的数据源 */
public static class ManualSource implements SourceFunction<String>
public ManualSource()
@Override
public void run(SourceFunction.SourceContext<String> sc)
Scanner scanner = new Scanner(System.in);
while (true)
String str = scanner.nextLine().trim();
if (str.equals("STOP")) break;
if (!str.equals("")) sc.collect(str);
scanner.close();
@Override
public void cancel()
/** 自动输入的数据源 */
public static class AutomatedSource implements SourceFunction<Integer>
public AutomatedSource()
@Override
public void run(SourceFunction.SourceContext<Integer> sc) throws InterruptedException
for (int i = 0; i < 999; i++)
Thread.sleep(2000);
sc.collect(i);
@Override
public void cancel()
测试结果截图
Keyed State
ValueState<T>
存储单个值ListState<T>
存储元素列表MapState<UK, UV>
存储键值对ReducingState<T>
存储单个值;当使用add
时,ReducingState会使用指定的ReduceFunction进行聚合AggregatingState<IN, OUT>
类似ReducingState,区别是:AggregatingState的 聚合结果OUT
与 输入IN
可以不同
ValueState
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class Hello
public static void main(String[] args) throws Exception
//创建流执行环境,并行度=1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建数据源
DataStreamSource<Integer> dss = env.fromElements(9, 5, 2, 7);
dss
.keyBy(i -> true大数据(9f)Flink双流JOIN