大数据(9d)Flink转换算子Transform
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9d)Flink转换算子Transform相关的知识,希望对你有一定的参考价值。
文章目录
环境
开发环境:WIN10+IDEA
pom.xml
<!-- 配置 -->
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.3</slf4j.version>
<log4j.version>2.17.2</log4j.version>
<fastjson.version>2.0.19</fastjson.version>
<lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>$log4j.version</version>
</dependency>
</dependencies>
log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
java模板
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hello
public static void main(String[] args) throws Exception
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//创建流数据源
DataStreamSource<Long> d = env.fromElements(1L, 2L, 3L, 4L);
//--------------------------------- Transform ----------------------------------------
d.print();
//--------------------------------- Transform ----------------------------------------
env.execute();
-
print结果
-
1
2
3
4
Transform
- 译名:转换算子
- 功能:把 一个或多个DataStream 转成 一个新的DataStream
map
- 消费1个元素,产出1个元素
d.map(s -> s + 1L).print();
-
print结果
-
2
3
4
5
flatMap
- 消费1个元素,产出零~多个元素
- 使用Lambda表达式时, 由于泛型擦除,须用
returns
指定泛型的类型
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
d
.flatMap((FlatMapFunction<Long, Long>) (value, out) ->
out.collect(value * value);
out.collect(-value * value);
)
.returns(Types.LONG)
.print();
-
print结果
-
1
-1
4
-4
9
-9
16
-16
filter
- 设定计算规则,保留返回true的数据,去除返回false的数据
d.filter(i -> (i % 2 == 0)).print();
-
print结果
-
2
4
union
- 多条流汇聚成1条流
- 多条流的类型要求一致
//创建3条流
DataStreamSource<Integer> d1 = env.fromElements(1);
DataStreamSource<Integer> d2 = env.fromElements(2, 2);
DataStreamSource<Integer> d3 = env.fromElements(3, 3, 3);
//联合2条流
d1.union(d2).union(d3).print();
-
某次print结果
-
2
2
3
3
3
1
connect
- 两个流放到一个流,两个流仍然相互独立
- 两个流中存储的数据类型可以不同
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
//创建2条流
DataStreamSource<Integer> d1 = env.fromElements(1, 2, 3, 4);
DataStreamSource<String> d2 = env.fromElements("a", "b", "c");
//连结2条流
ConnectedStreams<Integer, String> dd = d1.connect(d2);
//分别取出2条流
DataStream<Integer> s1 = dd.getFirstInput();
DataStream<String> s2 = dd.getSecondInput();
//打印
s1.print("first");
s2.print("second");
-
print结果
-
second> a
first> 1
second> b
first> 2
second> c
first> 3
first> 4
keyBy
- 把流中的数据 按key 分到不同的分区
DataStream<T>
=>KeyedStream<T,K>
- keyBy之后,可使用reduce、sum、max、min等方法
reduce
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hi
public static void main(String[] args) throws Exception
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//创建流数据源
DataStreamSource<String> d = env.fromElements("1", "4", "5", "2", "3");
//分区
KeyedStream<String, Integer> k = d.keyBy(i -> (Integer.parseInt(i) % 2));
//归约
k.reduce((ReduceFunction<String>) (value1, value2) -> value1 + "," + value2).print();
//执行
env.execute();
-
print结果
-
1
4
1,5
4,2
1,5,3
sum、max、min
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hi
public static void main(String[] args) throws Exception
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//创建流数据源
DataStreamSource<Integer> d = env.fromElements(1, 2, 3, 4, 5);
//分区
KeyedStream<Integer, Integer> k = d.keyBy(i -> (i % 2));
//聚合计算
k.sum(0).print("sum");
k.max(0).print("max");
k.min(0).print("min");
//执行
env.execute();
-
sum结果
-
sum> 1
sum> 2
sum> 4(1+3)
sum> 6(2+4)
sum> 9(1+3+5)
max结果
-
max> 1
max> 2
max> 3(1、3)
max> 4(2、4)
max> 5(1、3、5)
min结果
-
min> 1
min> 2
min> 1(1、3)
min> 2(2、4)
min> 1(1、3、5)
process
- 底层的算子
ProcessFunction
- ProcessFunction可在没有keyBy的情况下使用
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
d.process(new ProcessFunction<Long, String>()
@Override
public void processElement(Long value, Context ctx, Collector<String> out)
out.collect(value + "L");
).print();
-
print结果
-
1L
2L
3L
4L
KeyedProcessFunction
- KeyedProcessFunction要在keyBy后使用
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
KeyedStream<Long, Integer> k = d.keyBy(i -> (int) (i % 2));
k.process(new KeyedProcessFunction<Integer, Long, String>()
@Override
public void processElement(Long value, Context ctx, Collector<String> out)
System.out.println("当前key:" + ctx.getCurrentKey());
out.collect(value + "L");
).print("输出");
-
结果
-
当前key:1
输出> 1L
当前key:0
输出> 2L
当前key:1
输出> 3L
当前key:0
输出> 4L
以上是关于大数据(9d)Flink转换算子Transform的主要内容,如果未能解决你的问题,请参考以下文章
08-flink-1.10.1- flink Transform api 转换算子