大数据(9d)Flink转换算子Transform

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9d)Flink转换算子Transform相关的知识,希望对你有一定的参考价值。

文章目录

环境

开发环境:WIN10+IDEA

pom.xml

<!-- 配置 -->
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
    <fastjson.version>2.0.19</fastjson.version>
    <lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_$scala.binary.version</artifactId>
        <version>$flink.version</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>$slf4j.version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>$log4j.version</version>
    </dependency>
</dependencies>

log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

java模板

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Hello 
    public static void main(String[] args) throws Exception 
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //创建流数据源
        DataStreamSource<Long> d = env.fromElements(1L, 2L, 3L, 4L);
        //--------------------------------- Transform ----------------------------------------
        d.print();
        //--------------------------------- Transform ----------------------------------------
        env.execute();
    

print结果
1
2
3
4

Transform

  • 译名:转换算子
  • 功能:把 一个或多个DataStream 转成 一个新的DataStream

map

  • 消费1个元素,产出1个元素
d.map(s -> s + 1L).print();
print结果
2
3
4
5

flatMap

  • 消费1个元素,产出零~多个元素
  • 使用Lambda表达式时, 由于泛型擦除,须用returns指定泛型的类型
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
d
        .flatMap((FlatMapFunction<Long, Long>) (value, out) -> 
            out.collect(value * value);
            out.collect(-value * value);
        )
        .returns(Types.LONG)
        .print();
print结果
1
-1
4
-4
9
-9
16
-16

filter

  • 设定计算规则,保留返回true的数据,去除返回false的数据
d.filter(i -> (i % 2 == 0)).print();
print结果
2
4

union

  • 多条流汇聚成1条流
  • 多条流的类型要求一致
//创建3条流
DataStreamSource<Integer> d1 = env.fromElements(1);
DataStreamSource<Integer> d2 = env.fromElements(2, 2);
DataStreamSource<Integer> d3 = env.fromElements(3, 3, 3);
//联合2条流
d1.union(d2).union(d3).print();
某次print结果
2
2
3
3
3
1

connect

  • 两个流放到一个流,两个流仍然相互独立
  • 两个流中存储的数据类型可以不同
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
//创建2条流
DataStreamSource<Integer> d1 = env.fromElements(1, 2, 3, 4);
DataStreamSource<String> d2 = env.fromElements("a", "b", "c");
//连结2条流
ConnectedStreams<Integer, String> dd = d1.connect(d2);
//分别取出2条流
DataStream<Integer> s1 = dd.getFirstInput();
DataStream<String> s2 = dd.getSecondInput();
//打印
s1.print("first");
s2.print("second");
print结果
second> a
first> 1
second> b
first> 2
second> c
first> 3
first> 4

keyBy

  • 把流中的数据 按key 分到不同的分区
  • DataStream<T> => KeyedStream<T,K>
  • keyBy之后,可使用reduce、sum、max、min等方法

reduce

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Hi 
    public static void main(String[] args) throws Exception 
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //创建流数据源
        DataStreamSource<String> d = env.fromElements("1", "4", "5", "2", "3");
        //分区
        KeyedStream<String, Integer> k = d.keyBy(i -> (Integer.parseInt(i) % 2));
        //归约
        k.reduce((ReduceFunction<String>) (value1, value2) -> value1 + "," + value2).print();
        //执行
        env.execute();
    

print结果
1
4
1,5
4,2
1,5,3

sum、max、min

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Hi 
    public static void main(String[] args) throws Exception 
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //创建流数据源
        DataStreamSource<Integer> d = env.fromElements(1, 2, 3, 4, 5);
        //分区
        KeyedStream<Integer, Integer> k = d.keyBy(i -> (i % 2));
        //聚合计算
        k.sum(0).print("sum");
        k.max(0).print("max");
        k.min(0).print("min");
        //执行
        env.execute();
    

sum结果
sum> 1
sum> 2
sum> 4(1+3)
sum> 6(2+4)
sum> 9(1+3+5)
max结果
max> 1
max> 2
max> 3(1、3)
max> 4(2、4)
max> 5(1、3、5)
min结果
min> 1
min> 2
min> 1(1、3)
min> 2(2、4)
min> 1(1、3、5)

process

  • 底层的算子

ProcessFunction

  • ProcessFunction可在没有keyBy的情况下使用
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
d.process(new ProcessFunction<Long, String>() 
    @Override
    public void processElement(Long value, Context ctx, Collector<String> out) 
        out.collect(value + "L");
    
).print();
print结果
1L
2L
3L
4L

KeyedProcessFunction

  • KeyedProcessFunction要在keyBy后使用
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
KeyedStream<Long, Integer> k = d.keyBy(i -> (int) (i % 2));
k.process(new KeyedProcessFunction<Integer, Long, String>() 
    @Override
    public void processElement(Long value, Context ctx, Collector<String> out) 
        System.out.println("当前key:" + ctx.getCurrentKey());
        out.collect(value + "L");
    
).print("输出");
结果
当前key:1
输出> 1L
当前key:0
输出> 2L
当前key:1
输出> 3L
当前key:0
输出> 4L

以上是关于大数据(9d)Flink转换算子Transform的主要内容,如果未能解决你的问题,请参考以下文章

08-flink-1.10.1- flink Transform api 转换算子

08-flink-1.10.1- flink Transform api 转换算子

大数据(9d)Flink流处理核心编程

大数据(9d)Flink流处理核心编程

大数据(9d)Flink流处理核心编程练习:计算PV和UV

大数据(9d)Flink流处理核心编程练习-计算PV和UV