ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。
Posted 青冬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。相关的知识,希望对你有一定的参考价值。
Flink 定时获取HDFS 上某路径的parquet文件,并作为dim进行关联。
序
在前文提到使用Flink SQL 在1.13.2版本下无法支撑定时获取HDFS上的文件(更新等状态),但是Flink 的API上其实是提供了这个方案的。
再次申明,截止1.14版本Flink,是没有SQL能够不通过Hive来定时获取HDFS上的文件的。
ParquetRowInputFormat
ParquetRowInputFormat 是继承于RichInputFormat 的文件读取器,使用它可以定时的去更新整个数据的流向。
间隔获取HDFS上的文件
根据建表语句来设定MessageType(这个构造可以像我一样啰嗦一点,可以是使用别人的反射,或者直接使用Arvo的)。
在 env.readFile 中,有五个参数的构造,分别是 FileFormat 数据读取器,pathString 路径,FileProcessingMode 文件读取类型,间隔时间。如下我设定了 每一个小时去check一次路径上的文件是否有变化。
final String pathString = "hdfs://path/city/";
final ArrayList<Type> cityFields = new ArrayList<>();
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "province"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "city"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "district"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "provincecode"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "citycode"));
cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "districtcode"));
final ParquetRowInputFormat cityFormat = new ParquetRowInputFormat(new Path(pathString), new MessageType("", cityFields));
final SingleOutputStreamOperator<cityConfig> city= env.readFile(cityFormat , pathString
, FileProcessingMode.PROCESS_CONTINUOUSLY, 60 * 60 * 1000)
.map((MapFunction<Row, cityConfig>) cityConfig::new)
.returns(cityConfig.class)
.name("getCityConfig");
使用广播进行关联
// 创建 一个广播
final MapStateDescriptor<String, CityConfig> cityConfigMapStateDescriptor = new MapStateDescriptor<>("city", String.class, CityConfig.class);
// 主流进行关联 T 是主表中的javaBean对象 或者说PoJo吧
final SingleOutputStreamOperator<T> volteResult = volteStream
.connect(city.broadcast(cityConfigMapStateDescriptor)) // 关联city表,并且广播city
.process(new BroadcastProcessFunction<T, CityConfig, T>() { // 使用Broadcast进行广播,将数据分发到所有节点
@Override
// 需要实现两个方法,第一个时当遇到主流数据应该怎么做
// 从 broadcastState 中获取数据来进行关联
public void processElement(T value, BroadcastProcessFunction<T, CityConfig, T>.ReadOnlyContext ctx
, Collector<T> out) throws Exception {
ReadOnlyBroadcastState<String, CityConfig> broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor);
String cellKey = value.getProvinceCode() + "_" + value.getCityCode();
if (broadcastState.contains(cellKey)) {
final CityConfig cityConfig = broadcastState.get(cellKey);
out.collect(value.connect(cityConfig));
}
}
@Override
// 第二个时当遇到广播的数据该怎么做,直接放 broadcastState 中就可以了
public void processBroadcastElement(CityConfig value, BroadcastProcessFunction<T, CityConfig, T>.Context ctx
, Collector<T> out) throws Exception {
BroadcastState<String, CityConfig> broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor);
String cellKey = value.getProvinceCode() + "_" + value.getCityId();
broadcastState.put(cellKey, value);
}
})
.uid("connectCity") // 请设定uid,因为这个是要存储数据的,最好指定好uid,这样以后升级也能获取到这个数据。
.name("connectCity");
广播类型
BroadcastProcessFunction 和 KeyBroadcastProcessFunction。
上述使用的是 BroadcastProcessFunction,意思是将所有数据分发到所有节点。
Keyed意思就是使用Hash进行分发,每个节点只有部分数据。
以上是关于ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。的主要内容,如果未能解决你的问题,请参考以下文章
ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。