大数据(9f)Flink富函数RichFunction

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9f)Flink富函数RichFunction相关的知识,希望对你有一定的参考价值。

文章目录

1、概述

Rich Function,译名富函数,和普通函数相比,多了:
生命周期( openclose方法)
获取函数的运行时上下文( getRuntimeContext方法)
本文版本
Flink:1.14.6
Java:1.8
Scala:2.12

2、示例

2.1、普通函数

MapFunction接口 继承了 Function接口

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class H1 
    public static void main(String[] args) throws Exception 
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);
        //获取数据源
        DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
        //普通函数
        dss.map(new MapFunction<Integer, Integer>() 
            @Override
            public Integer map(Integer i) 
                return i * i;
            
        ).print();
        //执行
        env.execute();
    

测试结果

2.2、富函数

RichMapFunction抽象类 继承了 AbstractRichFunction抽象类
AbstractRichFunction抽象类 实现了 RichFunction接口

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class H1 
    public static void main(String[] args) throws Exception 
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);
        //获取数据源
        DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
        //普通函数
        dss.map(new RichMapFunction<Integer, Integer>() 
            @Override
            public void open(Configuration parameters) 
                System.out.println("生命周期开始");
            

            @Override
            public void close() 
                System.out.println("生命周期结束");
            

            @Override
            public Integer map(Integer i) 
                return i * i;
            
        ).print();
        //执行
        env.execute();
    

测试结果

2.2.1、获取富函数的运行时上下文

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class H1 
    public static void main(String[] args) throws Exception 
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(2);
        //获取数据源
        DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
        //普通函数
        dss.map(new RichMapFunction<Integer, Integer>() 
            @Override
            public void open(Configuration parameters) 
                System.out.println("生命周期开始");
                //获取运行时上下文
                RuntimeContext context = getRuntimeContext();
                System.out.println("子任务索引:" + context.getIndexOfThisSubtask());
            

            @Override
            public void close() 
                System.out.println("生命周期结束");
            

            @Override
            public Integer map(Integer i) 
                return i * i;
            
        ).print();
        //执行
        env.execute();
    

并行度设置为2,测试结果

3、源码截取

3.1、RichFunction

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;

@Public
public interface RichFunction extends Function 
    /** 函数的生命周期 */
    void open(Configuration parameters) throws Exception;

    void close() throws Exception;

    /** 获取函数运行时上下文对象,对象信息包含:并行度、作业ID、任务名、子任务索引… */
    RuntimeContext getRuntimeContext();

    /** 设置函数的运行时上下文。在创建函数的并行实例时,此方法被框架调用 */
    void setRuntimeContext(RuntimeContext t);

3.2、RuntimeContext

/**
 * RuntimeContext 包含 函数的运行时上下文信息
 * 函数的每个并行实例都有1个context对象,通过访问对象,可获取 静态信息、累加器、广播变量、状态
 */
@Public
public interface RuntimeContext 

    JobID getJobId();

    String getTaskName();

    int getIndexOfThisSubtask();

    int getAttemptNumber();

    String getTaskNameWithSubtasks();

    // ------------------------------------ 累加器 -------------------------------------------

    <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator);

    <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name);

    @PublicEvolving
    IntCounter getIntCounter(String name);

    @PublicEvolving
    LongCounter getLongCounter(String name);

    @PublicEvolving
    DoubleCounter getDoubleCounter(String name);

    @PublicEvolving
    Histogram getHistogram(String name);

    // ---------------------------------- 广播变量 -------------------------------------------

    @PublicEvolving
    boolean hasBroadcastVariable(String name);

    <RT> List<RT> getBroadcastVariable(String name);

    <T, C> C getBroadcastVariableWithInitializer(
            String name, BroadcastVariableInitializer<T, C> initializer);

    // -------------------------- 访问【状态】的方法 --------------------------------

    @PublicEvolving
    <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);

    @PublicEvolving
    <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

    @PublicEvolving
    <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);

    @PublicEvolving
    <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
            AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

    @PublicEvolving
    <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);

以上是关于大数据(9f)Flink富函数RichFunction的主要内容,如果未能解决你的问题,请参考以下文章

大数据(9f)Flink窗口函数练习:计算PV和UV

大数据(9f)Flink窗口函数练习:计算PV和UV

大数据(9f)Flink状态编程

大数据(9f)Flink状态编程

大数据(9f)Flink双流JOIN

大数据(9f)Flink双流JOIN