大数据(9f)Flink状态编程

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9f)Flink状态编程相关的知识,希望对你有一定的参考价值。

文章目录

概述

流式计算 分为 无状态计算 和 有状态计算

流处理的状态功能:去重、监控……

状态分类Managed StateRaw State
状态管理方式Flink Runtime托管,自动存储,自动恢复,自动伸缩用户自己管理
状态数据结构Flink提供多种数据结构,例如:ListStateMapState字节数组:byte[]
使用场景多数Flink算子所有算子

Managed State

RawState是在已有算子和ManagedState不够用时才使用
一般来说,ManagedState已经够用,下面重点学习

Managed State 分类Operator StateKeyed State
译名算子状态键控状态
状态分配1个算子的子任务对应1个State1个算子处理多个Key,1个Key对应1个State
出场率较低较高

本文开发环境是WIN10+IDEA;Flink版本是1.14

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
    <fastjson.version>2.0.19</fastjson.version>
    <lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>$log4j.version</version>
    </dependency>
</dependencies>

Operator State

  • 算子状态可用在所有算子上,每个算子子任务SubTask)共享一个状态
    算子子任务之间的状态不能互相访问
  • 下面以列表状态广播状态为例

ListState

列表状态 可与 检查点 合用,来 定期保存和清空状态

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class Hello 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //每3秒1次Checkpointing
        env.enableCheckpointing(3000L);
        //创建数据源,每秒1个数据
        DataStreamSource<Integer> dss = env.addSource(new MySource());
        //测试状态和检查点
        dss.map(new MyMapFunction()).print();
        //流环境执行
        env.execute();
    

    private static class MyMapFunction implements MapFunction<Integer, String>, CheckpointedFunction 
        private ListState<Integer> state;

        @Override
        public String map(Integer value) throws Exception 
            state.add(value);
            return state.get().toString();
        

        @Override
        public void snapshotState(FunctionSnapshotContext context) 
            System.out.println("Checkpoint时调用snapshotState,清空状态");
            state.clear();
        

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception 
            System.out.println("创建状态");
            state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("", Integer.class));
        
    

    public static class MySource implements SourceFunction<Integer> 
        public MySource() 

        @Override
        public void run(SourceContext<Integer> sc) throws InterruptedException 
            for (int i = 0; i < 99; i++) 
                sc.collect(i);
                Thread.sleep(1000L);
            
        

        @Override
        public void cancel() 
    

测试结果

创建状态
[0]
[0, 1]
Checkpoint时调用snapshotState,清空状态
[2]
[2, 3]
[2, 3, 4]
Checkpoint时调用snapshotState,清空状态
[5]
[5, 6]
[5, 6, 7]
Checkpoint时调用snapshotState,清空状态
[8]
……

BroadcastState

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Scanner;

public class Hello 
    final static String STATE_KEY = "";

    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
        //1、主数据流
        DataStreamSource<Integer> mainStream = env.addSource(new AutomatedSource());
        //1、控制主数据的辅助流
        DataStreamSource<String> branchStream = env.addSource(new ManualSource());
        //2、创建状态描述符
        MapStateDescriptor<String, Long> stateDescriptor = new MapStateDescriptor<>("", String.class, Long.class);
        //2、创建广播流
        BroadcastStream<String> broadcastStream = branchStream.broadcast(stateDescriptor);
        //3、主数据流 连接 广播流
        BroadcastConnectedStream<Integer, String> b = mainStream.connect(broadcastStream);
        //BroadcastProcessFunction<IN1, IN2, OUT>
        b.process(new BroadcastProcessFunction<Integer, String, String>() 
            //processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out)
            @Override
            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception 
                //4、获取广播状态
                BroadcastState<String, Long> state = ctx.getBroadcastState(stateDescriptor);
                //4、修改广播状态
                state.put(STATE_KEY, Long.valueOf(value));
            

            //processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out)
            @Override
            public void processElement(Integer value, ReadOnlyContext ctx, Collector<String> out) throws Exception 
                //5、获取只读广播状态
                ReadOnlyBroadcastState<String, Long> state = ctx.getBroadcastState(stateDescriptor);
                //5、从广播状态中取值
                Long stateValue = state.get(STATE_KEY);
                //6、输出
                if (stateValue != null) 
                    out.collect("有请" + value + "号佳丽进入" + stateValue + "号舞台");
                
            
        ).print();
        //流环境执行
        env.execute();
    

    /** 手动输入的数据源 */
    public static class ManualSource implements SourceFunction<String> 
        public ManualSource() 

        @Override
        public void run(SourceFunction.SourceContext<String> sc) 
            Scanner scanner = new Scanner(System.in);
            while (true) 
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) break;
                if (!str.equals("")) sc.collect(str);
            
            scanner.close();
        

        @Override
        public void cancel() 
    

    /** 自动输入的数据源 */
    public static class AutomatedSource implements SourceFunction<Integer> 
        public AutomatedSource() 

        @Override
        public void run(SourceFunction.SourceContext<Integer> sc) throws InterruptedException 
            for (int i = 0; i < 999; i++) 
                Thread.sleep(2000);
                sc.collect(i);
            
        

        @Override
        public void cancel() 
    

测试结果截图

Keyed State

  • ValueState<T>
    存储单个值
  • ListState<T>
    存储元素列表
  • MapState<UK, UV>
    存储键值对
  • ReducingState<T>
    存储单个值;当使用add时,ReducingState会使用指定的ReduceFunction进行聚合
  • AggregatingState<IN, OUT>
    类似ReducingState,区别是:AggregatingState的 聚合结果OUT 与 输入IN 可以不同

ValueState

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class Hello 
    public static void main(String[] args) throws Exception 
        //创建流执行环境,并行度=1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建数据源
        DataStreamSource<Integer> dss = env.fromElements(9, 5, 2, 7);
        dss
                .keyBy(i -> true大数据(9f)Flink双流JOIN

大数据(9f)Flink双流JOIN

大数据(9f)Flink富函数RichFunction

大数据(9f)Flink富函数RichFunction

大数据(9f)Flink窗口函数练习:计算PV和UV

大数据(9f)Flink窗口函数练习:计算PV和UV