ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。

Posted 青冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。相关的知识,希望对你有一定的参考价值。

Flink 定时获取HDFS 上某路径的parquet文件,并作为dim进行关联。

在前文提到使用Flink SQL 在1.13.2版本下无法支撑定时获取HDFS上的文件(更新等状态),但是Flink 的API上其实是提供了这个方案的。
再次申明,截止1.14版本Flink,是没有SQL能够不通过Hive来定时获取HDFS上的文件的。

ParquetRowInputFormat

ParquetRowInputFormat 是继承于RichInputFormat 的文件读取器,使用它可以定时的去更新整个数据的流向。

间隔获取HDFS上的文件

根据建表语句来设定MessageType(这个构造可以像我一样啰嗦一点,可以是使用别人的反射,或者直接使用Arvo的)。

在 env.readFile 中,有五个参数的构造,分别是 FileFormat 数据读取器,pathString 路径,FileProcessingMode 文件读取类型,间隔时间。如下我设定了 每一个小时去check一次路径上的文件是否有变化。

final String pathString = "hdfs://path/city/";
        final ArrayList<Type> cityFields = new ArrayList<>();
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "province"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "city"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "district"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "provincecode"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "citycode"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "districtcode"));

        final ParquetRowInputFormat cityFormat = new ParquetRowInputFormat(new Path(pathString), new MessageType("", cityFields));
        final SingleOutputStreamOperator<cityConfig> city= env.readFile(cityFormat , pathString
                        , FileProcessingMode.PROCESS_CONTINUOUSLY, 60 * 60 * 1000)
                .map((MapFunction<Row, cityConfig>) cityConfig::new)
                .returns(cityConfig.class)
                .name("getCityConfig");

使用广播进行关联

// 创建 一个广播
final MapStateDescriptor<String, CityConfig> cityConfigMapStateDescriptor = new MapStateDescriptor<>("city", String.class, CityConfig.class);

// 主流进行关联 T 是主表中的javaBean对象 或者说PoJo吧

final SingleOutputStreamOperator<T> volteResult = volteStream
                .connect(city.broadcast(cityConfigMapStateDescriptor)) // 关联city表,并且广播city
                .process(new BroadcastProcessFunction<T, CityConfig, T>() { // 使用Broadcast进行广播,将数据分发到所有节点
                    @Override
                    // 需要实现两个方法,第一个时当遇到主流数据应该怎么做
                    // 从 broadcastState 中获取数据来进行关联
                    public void processElement(T value, BroadcastProcessFunction<T, CityConfig, T>.ReadOnlyContext ctx
                            , Collector<T> out) throws Exception {
                        ReadOnlyBroadcastState<String, CityConfig> broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor);
                        String cellKey = value.getProvinceCode() + "_" + value.getCityCode();
                        if (broadcastState.contains(cellKey)) {
                            final CityConfig cityConfig = broadcastState.get(cellKey);
                            out.collect(value.connect(cityConfig));
                        }
                    }

                    @Override
                    // 第二个时当遇到广播的数据该怎么做,直接放 broadcastState 中就可以了
                    public void processBroadcastElement(CityConfig value, BroadcastProcessFunction<T, CityConfig, T>.Context ctx
                            , Collector<T> out) throws Exception {
                        BroadcastState<String, CityConfig> broadcastState = ctx.getBroadcastState(cityConfigMapStateDescriptor);
                        String cellKey = value.getProvinceCode() + "_" + value.getCityId();
                        broadcastState.put(cellKey, value);
                    }
                })
                .uid("connectCity") // 请设定uid,因为这个是要存储数据的,最好指定好uid,这样以后升级也能获取到这个数据。
                .name("connectCity");

广播类型

BroadcastProcessFunction 和 KeyBroadcastProcessFunction。
上述使用的是 BroadcastProcessFunction,意思是将所有数据分发到所有节点。
Keyed意思就是使用Hash进行分发,每个节点只有部分数据。

以上是关于ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。的主要内容,如果未能解决你的问题,请参考以下文章

ParquetRowInputFormat Flink 定时获取HDFS上某路径的parquet文件,并作为dim与Kafka中的主表进行关联。

Flink 入门

《美团机器实践》略看

15事例十五:纹理映射

android片段getArguments()返回null

FlinkFlink 1.12.2 启动脚本