flink数据流转换算子

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink数据流转换算子相关的知识,希望对你有一定的参考价值。

【README】

本文记录了flink对数据的转换操作,包括

  1. 基本转换,map,flatMap,filter;
  2. 滚动聚合(min minBy max maxBy sum);
  3. 规约聚合-reduce;
  4. 分流;
  5. connect连接流;
  6. union合流;
  7. 富函数;
  8. 重分区;

本文使用的flink为 1.14.4 版本;maven依赖如下:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.4</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>

本文部分内容参考了 flink 官方文档:

概览 | Apache Flink算子 # 用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。这部分内容将描述 Flink DataStream API 中基本的数据转换 API,数据转换后各种数据分区方式,以及算子的链接策略。数据流转换 # Map # DataStream → DataStream # 输入一个元素同时输出一个元素。下面是将输入流中元素数值加倍的 map function:Java DataStream dataStream = //... dataStream.map(new MapFunction() @Override public Integer map(Integer value) throws Exception return 2 * value; ); Scala dataStream.map x => x * 2 Python data_stream = env.from_collection(collection=[1, 2, 3, 4, 5]) data_stream.map(lambda x: 2 * x, output_type=Types.,>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/overview/


【1】基本转换算子

包括 map-转换, flatMap-打散,filter-过滤;

1)代码如下:

/**
 * @Description flink对数据流的基本转换
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月15日
 */
public class TransformTest1_Base 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从kafka读取数据
        DataStream<String> baseStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor.txt");

        // 1-map-转换或映射或函数; 把string转为长度输出
//        DataStream<Integer> mapStream =  baseStream.map(x->x.length());
          DataStream<Integer> mapStream =  baseStream.map(String::length);

        // 2-flatMap-打散-按照逗号分割字段
        DataStream<String> flatMapStream = baseStream.flatMap((String raw, Collector<String> collector)->
            for (String rd : raw.split(",")) 
                collector.collect(rd);
            
        ).returns(Types.STRING);

        // 3-filter-过滤-筛选 sensor_1 开头的结束
        DataStream<String> filterStream = baseStream.filter(x->x.startsWith("sensor_1"));

        // 打印输出
        mapStream.print("mapStream");
        flatMapStream.print("flatMapStream");
        filterStream.print("filterStream");
        // 执行
        env.execute("BaseTransformStreamJob");
    

sensor 文本文件如下:

sensor_1,12341561,36.1
sensor_2,12341562,33.5
sensor_3,12341563,39.9
sensor_1,12341573,43.1

打印结果:

mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341561
flatMapStream> 36.1
filterStream> sensor_1,12341561,36.1
mapStream> 22
flatMapStream> sensor_2
flatMapStream> 12341562
flatMapStream> 33.5
mapStream> 22
flatMapStream> sensor_3
flatMapStream> 12341563
flatMapStream> 39.9
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341573
flatMapStream> 43.1
filterStream> sensor_1,12341573,43.1


【2】滚动聚合算子

keyBy算子-根据key对数据流分组,因为聚合前必须前分组,类似于sql的group by;

keyBy算子的作用

  • 逻辑把一个数据流拆分为多个分区(但还是同一个流),每个分区包含相同key(相同hash)的元素,底层对key求hash来实现;
  • 在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。

keyBy可以形成 KeyedStream

然后滚动聚合算子可以对 KeyStream 进行操作,滚动聚合算子如下:

  • sum
  • min
  • max
  • minBy
  • maxBy

【2.1】代码示例

/**
 * @Description 滚动聚合算子
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月15日
 */
public class TransformTest2_RollingAgg 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从kafka读取数据
        DataStream<String> inputStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor.txt");

        // 转换为 SensorReader pojo类型
        DataStream<SensorReading> sensorStream = inputStream.map(x -> 
            String[] arr = x.split(",");
            return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
        );

        // keyBy算子对数据流分组,并做滚动聚合(单字段分组)
        KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);
        // keyBy 多字段分组
//        KeyedStream<SensorReading, Tuple1<String>> keyedStream = sensorStream.keyBy(new KeySelector<SensorReading, Tuple1<String>>() 
//            @Override
//            public Tuple1<String> getKey(SensorReading sensorReading) throws Exception 
//                return Tuple1.of(sensorReading.getId());
//            
//        );

        // max聚合
        DataStream<SensorReading> maxTempratureStream = keyedStream.max("temperature");

        // maxBy 聚合
        DataStream<SensorReading> maxbyTempratureStream = keyedStream.maxBy("temperature");

        // 打印输出
        maxTempratureStream.print("maxTempratureStream");
        // 打印输出
        maxbyTempratureStream.print("maxbyTempratureStream");
        // 执行
        env.execute("maxTempratureStreamJob");
    

sensor文本内容:

sensor_1,11,36.1
sensor_2,21,33.1
sensor_1,12,36.2
sensor_1,13,36.3
sensor_2,22,33.2

max聚合打印结果:

max> SensorRdid='sensor_1', timestamp=11, temperature=36.1
max> SensorRdid='sensor_2', timestamp=21, temperature=33.1
max> SensorRdid='sensor_1', timestamp=11, temperature=36.2
max> SensorRdid='sensor_1', timestamp=11, temperature=36.3
max> SensorRdid='sensor_2', timestamp=21, temperature=33.2

maxBy聚合打印结果:

maxBy> SensorRdid='sensor_1', timestamp=11, temperature=36.1
maxBy> SensorRdid='sensor_2', timestamp=21, temperature=33.1
maxBy> SensorRdid='sensor_1', timestamp=12, temperature=36.2
maxBy> SensorRdid='sensor_1', timestamp=13, temperature=36.3
maxBy> SensorRdid='sensor_2', timestamp=22, temperature=33.2

小结,max与maxBy区别:

  • max:把聚合字段(最大温度值)取出来,其他字段和第一条记录保持一致;
  • maxBy:把聚合字段(最大温度值)取出来,且连同最大温度值所在记录的其他字段一并取出来;

同理 min与minby,本文不再演示;

补充: 聚合时要先分组,可以根据单字段分组,也可以根据多个字段分组

上述代码注释部分给出了多个字段分组的例子,一个组记录称为Tuple,元组

1个字段叫 Tuple1,2个字段叫Tuple2;....


【2.2】规约聚合-reduce

定义:

在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

场景:根据sensorid分组后,形成keyedStream,然后查询最大温度,且最新时间戳;即多个聚合算子;

代码

/**
 * @Description reduce规约聚合算子 
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月15日
 */
public class TransformTest3_Reduce 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从kafka读取数据
        DataStream<String> inputStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor.txt");

        // 转换为 SensorReader pojo类型
        DataStream<SensorReading> sensorStream = inputStream.map(x -> 
            String[] arr = x.split(",");
            return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
        );

        // keyBy算子对数据流分组,并做滚动聚合(单字段分组)
        KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);

        // reduce规约聚合-查询最大温度,且最新时间戳
        DataStream<SensorReading> reduceStream = keyedStream.reduce((a,b)->
                new SensorReading(a.getId(), Math.max(a.getTimestamp(),b.getTimestamp()), Math.max(a.getTemperature(),b.getTemperature())));

        // 打印输出
        reduceStream.print("reduceStream");
        // 执行
        env.execute("reduceStreamJob");
    

sensor文本:

sensor_1,11,36.1
sensor_2,21,33.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,31.2

打印结果:

reduceStream> SensorRdid='sensor_1', timestamp=11, temperature=36.1
reduceStream> SensorRdid='sensor_2', timestamp=21, temperature=33.1
reduceStream> SensorRdid='sensor_1', timestamp=32, temperature=36.2
reduceStream> SensorRdid='sensor_1', timestamp=32, temperature=36.2
reduceStream> SensorRdid='sensor_2', timestamp=22, temperature=33.1


【3】分流(把一个流切分为多个流)

flink 1.14.4 移除了 split 算子,refer2  https://issues.apache.org/jira/browse/FLINK-19083

转而使用 side output 侧输出实现,refer2

Side Outputs | Apache Flink

【3.1】 切分流(flink移除了split方法,需要使用 side output 来实现流切分)

1)代码,启动大于30度算高温,否则低温;

通过实现  ProcessFunction 来实现;

public class TransformTest4_SplitStream 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从kafka读取数据
        DataStream<String> inputStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor.txt");

        // 转换为 SensorReader pojo类型
        DataStream<SensorReading> sensorStream = inputStream.map(x -> 
            String[] arr = x.split(",");
            return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
        );

       // 1,分流,按照温度值是否大于30度,分为两条流-高温和低温
        OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") ;
        OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") ;
        SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() 
            @Override
            public void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception 
                // 把数据发送到侧输出
                context.output(record.getTemperature()>30? highTag : lowTag, record);
                // 把数据发送到常规输出
                collector.collect(record);
            
        );

        // 2, 选择流打印输出
        splitStream.getSideOutput(highTag).print("high");
        splitStream.getSideOutput(lowTag).print("low");
        // 执行
        env.execute("reduceStreamJob");
    

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

打印结果:

high> SensorRdid='sensor_1', timestamp=11, temperature=36.1
low> SensorRdid='sensor_2', timestamp=21, temperature=23.1
high> SensorRdid='sensor_1', timestamp=32, temperature=36.2
high> SensorRdid='sensor_1', timestamp=23, temperature=30.3
low> SensorRdid='sensor_2', timestamp=22, temperature=11.2

以上分流代码refer2  Process function: a versatile tool in Flink datastream API | Develop Paper


【4】connect 连接流

1)定义: 把多个流连接为一个流,叫做连接流,连接流中的子流的各自元素类型可以不同

2)步骤:

  • 把2个流 connect 连接再一起形成 ConnectedStream;
  • 把连接流 通过 map 得到数据流;

代码:

/**
 * @Description connect-连接流
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月16日
 */
public class TransformTest5_ConnectStream 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从kafka读取数据
        DataStream<String> inputStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor.txt");

        // 转换为 SensorReader pojo类型
        DataStream<SensorReading> sensorStream = inputStream.map(x -> 
            String[] arr = x.split(",");
            return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
        );

        // 1,分流,按照温度值是否大于30度,分为两条流-高温和低温
        OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") 
        ;
        OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") 
        ;
        SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() 
            @Override
            public void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception 
                // 把数据发送到侧输出
                context.output(record.getTemperature() > 30 ? highTag : lowTag, record);
                // 把数据发送到常规输出
                collector.collect(record);
            
        );
        // 得到高温和低温流
        DataStream<SensorReading> highStream = splitStream.getSideOutput(highTag);
        DataStream<SensorReading> lowStream = splitStream.getSideOutput(lowTag);

        // 2 把2个流连接为1个流(子流1的元素为3元组,子流2的元素为2元组)
        ConnectedStreams<SensorReading, SensorReading> connectedStreams = highStream.connect(lowStream);
        DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<SensorReading, SensorReading, Object>() 
            @Override
            public Object map1(SensorReading rd) throws Exception 
                return new Tuple3<>(rd.getId(), rd.getTemperature(), "high"); // map1 作用于第1个流 highStream
            
            @Override
            public Object map2(SensorReading rd) throws Exception 
                return new Tuple2<>(rd.getId(), rd.getTemperature()); // map2 作用于第2个流 lowStream
            
        );

        // 3 打印结果
        resultStream.print("connectedStream");
        // 执行
        env.execute("connectedStreamJob");
    

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

打印结果:

connectedStream> (sensor_1,36.1,high)
connectedStream> (sensor_2,23.1)
connectedStream> (sensor_1,36.2,high)
connectedStream> (sensor_2,11.2)
connectedStream> (sensor_1,30.3,high)


【5】合流-union

上述connect,只能连接两条流,如果要合并多条流,connect需要多次连接,不太适合;

如果要合并多条流,需要用 union,前提是 多个流的元素数据类型需要相同;

1)代码

 // 2 把3个流合并为1个流
        DataStream<SensorReading> inputStream2 = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor2.txt")
                .map(x -> 
                    String[] arr = x.split(",");
                    return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
                );
        DataStream<SensorReading> unionStream =  highStream.union(lowStream,inputStream2);
        // 3 打印结果
        unionStream.print("unionStream");
        // 执行
        env.execute("unionStreamJob");

打印结果:

unionStream> SensorRdid='sensor2_1', timestamp=11, temperature=36.1
unionStream> SensorRdid='sensor2_2', timestamp=21, temperature=23.1
unionStream> SensorRdid='sensor2_1', timestamp=32, temperature=36.2
unionStream> SensorRdid='sensor2_1', timestamp=23, temperature=30.3
unionStream> SensorRdid='sensor2_2', timestamp=22, temperature=11.2
unionStream> SensorRdid='sensor_1', timestamp=11, temperature=36.1
unionStream> SensorRdid='sensor_1', timestamp=32, temperature=36.2
unionStream> SensorRdid='sensor_1', timestamp=23, temperature=30.3
unionStream> SensorRdid='sensor_2', timestamp=21, temperature=23.1
unionStream> SensorRdid='sensor_2', timestamp=22, temperature=11.2


【6】自定义函数 UDF user-defined function

flink 暴露了所有udf 函数的接口,如MapFunction, FilterFunction, ProcessFunction等;可以理解为 java8引入的 函数式接口;

可以参考官方的udf文档:

ck自定义函数 | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/

【6.1】富函数

1)复函数可以获取上下文信息,而普通函数则不行;

代码:

/**
 * @Description 富函数
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月16日
 */
public class TransformTest7_RichFunction 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 从文件读取数据
        DataStream<String> inputStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor.txt");

        // 转换为 SensorReader pojo类型
        DataStream<SensorReading> sensorStream = inputStream.map(x -> 
            String[] arr = x.split(",");
            return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
        );

        // 自定义富函数
        DataStream<Tuple3<String,Integer,Integer>> richMapStream = sensorStream.map(new MyRichMapFunction());

        // 3 打印结果
        richMapStream.print("richMapStream");
        // 执行
        env.execute("richMapStreamJob");
    
    // 富函数类
    static class MyRichMapFunction extends RichMapFunction<SensorReading, Tuple3<String, Integer, Integer>> 
        @Override
        public Tuple3<String, Integer, Integer> map(SensorReading record) throws Exception 
            // 富函数可以获取运行时上下文的属性  getRuntimeContext() ,普通map函数则不行
            return new Tuple3<String, Integer, Integer>(
                    record.getId(), record.getId().length(), getRuntimeContext().getIndexOfThisSubtask());
        
        @Override
        public void open(Configuration parameters) throws Exception 
            // 初始化工作,一般是定义状态, 或者建立数据库连接
            System.out.println("open db conn");
        
        @Override
        public void close() throws Exception 
            // 关闭连接,清空状态
            System.out.println("close db conn");
        
    

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

 打印结果:

open db conn
open db conn
richMapStream:1> (sensor_1,8,0)
richMapStream:2> (sensor_1,8,1)
richMapStream:1> (sensor_2,8,0)
richMapStream:2> (sensor_2,8,1)
richMapStream:1> (sensor_1,8,0)
close db conn
close db conn

从打印结果可以看出,每个子任务(线程)都会执行 open close方法 ,tuple3中的第3个字段是 执行上下文的任务id(这是富函数才可以获得上下文);


【7】flink中的数据重分区

1)flink中的分区指的是: taskmanager中的槽,即线程

分区操作有:

  • shuffle-洗牌乱分区;
  • keyBy-按照key分区;
  • global 把数据转到第1个分区

2)代码 :

/**
 * @Description 重分区
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月16日
 */
public class TransformTest8_Partition2 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 从文件读取数据
        DataStream<String> inputStream = env.readTextFile("D:\\\\workbench_idea\\\\diydata\\\\flinkdemo2\\\\src\\\\main\\\\resources\\\\sensor.txt");

        // 转换为 SensorReader pojo类型
        DataStream<SensorReading> sensorStream = inputStream.map(x -> 
            String[] arr = x.split(",");
            return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
        );

        // 1-shuffle 洗牌(乱分区)
        DataStream<SensorReading> shuffleStream = sensorStream.shuffle();
        shuffleStream.print("shuffleStream");
        // 2-keyby 按照key分区
        DataStream<SensorReading> keybyStream = sensorStream.keyBy(SensorReading::getId);
//        keybyStream.print("keybyStream");
        // 3-global 把数据转到第1个分区
        DataStream<SensorReading> globalStream = sensorStream.global();
//        globalStream.print("globalStream");

        // 执行
        env.execute("partitionJob");
    

sensor文本:

sensor_1,11,36.1
sensor_2,21,23.1
sensor_1,32,36.2
sensor_1,23,30.3
sensor_2,22,11.2

原生分区结果:(重分区前)

rawStream:1> SensorRdid='sensor_1', timestamp=11, temperature=36.1
rawStream:2> SensorRdid='sensor_1', timestamp=23, temperature=30.3
rawStream:1> SensorRdid='sensor_2', timestamp=21, temperature=23.1
rawStream:2> SensorRdid='sensor_2', timestamp=22, temperature=11.2
rawStream:1> SensorRdid='sensor_1', timestamp=32, temperature=36.2

shuffle-洗牌乱分区结果:

shuffleStream:2> SensorRdid='sensor_1', timestamp=11, temperature=36.1
shuffleStream:1> SensorRdid='sensor_2', timestamp=22, temperature=11.2
shuffleStream:2> SensorRdid='sensor_1', timestamp=32, temperature=36.2
shuffleStream:1> SensorRdid='sensor_2', timestamp=21, temperature=23.1
shuffleStream:2> SensorRdid='sensor_1', timestamp=23, temperature=30.3

keyby-按照key进行分区的结果:

keybyStream:1> SensorRdid='sensor_2', timestamp=21, temperature=23.1
keybyStream:2> SensorRdid='sensor_1', timestamp=23, temperature=30.3
keybyStream:1> SensorRdid='sensor_2', timestamp=22, temperature=11.2
keybyStream:2> SensorRdid='sensor_1', timestamp=11, temperature=36.1
keybyStream:2> SensorRdid='sensor_1', timestamp=32, temperature=36.2

global-把数据转到第1个分区的打印结果:

globalStream:1> SensorRdid='sensor_1', timestamp=23, temperature=30.3
globalStream:1> SensorRdid='sensor_2', timestamp=22, temperature=11.2
globalStream:1> SensorRdid='sensor_1', timestamp=11, temperature=36.1
globalStream:1> SensorRdid='sensor_2', timestamp=21, temperature=23.1
globalStream:1> SensorRdid='sensor_1', timestamp=32, temperature=36.2

以上是关于flink数据流转换算子的主要内容,如果未能解决你的问题,请参考以下文章

flink算子

flink window窗口算子

大数据(9d)Flink转换算子Transform

大数据(9d)Flink转换算子Transform

08-flink-1.10.1- flink Transform api 转换算子

08-flink-1.10.1- flink Transform api 转换算子