Flink中自定义Rich函数实现
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中自定义Rich函数实现相关的知识,希望对你有一定的参考价值。
在Flink中,我们知道map ,flatMap,reduce算子都可以自定义函数实现,比如MapFunction:
public class MyMapFunction implements MapFunction<String,Integer>
@Override
public Integer map(String s) throws Exception
return Integer.parseInt(s);
同时,Flink中还提供了对应的Rich函数,比如RichMapFunction,RichFlatMapFunction,RichReduceFunciton,而Rich相关函数都会继承AbstractRichFunction
,这个类中会实现如下几个方法:
// Flink在算子调用前会执行open方法
public void open(Configuration parameters) throws Exception
// 获取Flink运行时上下文,每个并行的算子任务都有一个上下文,会记录算子执行过程中一些信息,包括算子的并行度、任务序号、广播数据、累加器、监控数据、以及重要的状态数据
public RuntimeContext getRuntimeContext() ;
public void close()
Rich算子是Flink中状态计算的实现入口,我们这里模拟实现一个:
public class MyRichMapFunction extends RichMapFunction<String, Integer>
private MapState<String,Integer> mapState;
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("testCount",String.class,Integer.class));
@Override
public void close() throws Exception
super.close();
@Override
public Integer map(String s) throws Exception
if(!mapState.contains(s))
mapState.put(s,0);
mapState.put(s,mapState.get(s)+1);
return Integer.parseInt(s);
以上是关于Flink中自定义Rich函数实现的主要内容,如果未能解决你的问题,请参考以下文章
[源码分析]从"UDF不应有状态" 切入来剖析Flink SQL代码生成 (修订版)