Flink中自定义Rich函数实现

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中自定义Rich函数实现相关的知识,希望对你有一定的参考价值。

在Flink中,我们知道map ,flatMap,reduce算子都可以自定义函数实现,比如MapFunction:

public class MyMapFunction implements MapFunction<String,Integer> 
    @Override
    public Integer map(String s) throws Exception 
        return Integer.parseInt(s);
    

同时,Flink中还提供了对应的Rich函数,比如RichMapFunction,RichFlatMapFunction,RichReduceFunciton,而Rich相关函数都会继承AbstractRichFunction,这个类中会实现如下几个方法:

// Flink在算子调用前会执行open方法
public void open(Configuration parameters) throws Exception 
// 获取Flink运行时上下文,每个并行的算子任务都有一个上下文,会记录算子执行过程中一些信息,包括算子的并行度、任务序号、广播数据、累加器、监控数据、以及重要的状态数据
public RuntimeContext getRuntimeContext() ;
public void close()

Rich算子是Flink中状态计算的实现入口,我们这里模拟实现一个:

public class MyRichMapFunction extends RichMapFunction<String, Integer> 

    private MapState<String,Integer> mapState;
    @Override
    public void open(Configuration parameters) throws Exception 
        super.open(parameters);
        mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("testCount",String.class,Integer.class));
    

    @Override
    public void close() throws Exception 
        super.close();
    

    @Override
    public Integer map(String s) throws Exception 
        if(!mapState.contains(s))
            mapState.put(s,0);
        
        mapState.put(s,mapState.get(s)+1);
        return Integer.parseInt(s);
    

以上是关于Flink中自定义Rich函数实现的主要内容,如果未能解决你的问题,请参考以下文章

[源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)

python中自定义排序函数

python中自定义函数的保留字是

用c语言编程编写函数,实现把一个字符串连接到后面一个字符串。并编写程序,在程序中自定义一个字符串数组

(转)quick中自定义事件

在JS中自定义函数并实现类似 $.messager.confirm(title, msg, fn)功能,请附代码。