流式计算-Flink Stream API 篇二
Posted 曱甴崽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了流式计算-Flink Stream API 篇二相关的知识,希望对你有一定的参考价值。
话说看图看核心,代码也得看核心。Flink体系十分庞大复杂,没个几十篇文章没法聊完,核心就是最擅长,做的最好,就像说起百度就是搜索,微信就是聊天一样,至于百度核心还是不是搜索已经不那么重要了,但至少聊起Flink,我们得知道,Flink核心是流计算!
环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/Flink1.9.1
难度:新手--战士--老兵--大师
目标:
-
Flink流计算核心API应用实战
说明:
步骤:
Flink中将每个转换操作称为算子(Operator),详细可参考官网API.Doc:以下分别详述:
01 映射
map映射计算比较简单,不做很多解释,map的参数MapFunction<T, R>属于“Functional Interface”,可直接使用Lambda代替,本例对整数流中每个元素加100:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// list1将不可更新操作
// List<Integer> list1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
List<Integer> list = new ArrayList<>(Collections.emptyList());
for (int i = 1; i <= 10; i++) {
list.add(i);
}
System.out.println("environment.getParallelism >>>> " + environment.getParallelism());
DataStream<Integer> dataStream = environment
.setParallelism(6) // 设置执行环境并行度
.fromCollection(list);
System.out.println("dataStream Parallelism is >>>> " + dataStream.getParallelism());
// lambda表达式实现参数 MapFunction<T, R>
dataStream.map(t -> t + 100)
.setParallelism(6)
.writeAsText("D:/API/MapTest", FileSystem.WriteMode.OVERWRITE);
// .print(); //打印到控制台
environment.execute();
}
可通过environment.setParallelism(6) 执行环境全局或 单独的map(t -> t + 100).setParallelism(6) 算子属性设置,获取并行度:
System.out.println("environment.getParallelism >>>> " + environment.getParallelism());
System.out.println("dataStream.getParallelism >>>> " + dataStream.getParallelism());
结果如下,有8个文件,说明并行度为8:
02 扁平映射
这个算子也简单,可类比java Stream的flatmap,注意flatMap函数返回void,其实他将返回值放到Collector中,所以可以容纳元素拆分的不限数量的结果。本例将流中元素(文本行)分割成单词,最终形成一个单词流:
public class T2_FlatMapTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> list = Arrays.asList("this is FlatMapTest","by xiaobiao","what a nice day");
DataStream<String> dataStream = environment.fromCollection(list);
dataStream.flatMap(flatMapFunction)
.writeAsText("D:/API/FlatMap", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
private static FlatMapFunction<String, String> flatMapFunction = new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
for(String word: value.split(" ")){
out.collect(word);
}
}
};
}
结果如下,有8个文件,这个实例中不能自定义setParallelism,为1:
03 过滤
这个operator也简单,不解释,lambda表达式实现filte函数的参数 FilterFunction(T),本例实现过滤出偶数元素:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// list将不可更新操作
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
DataStream<Integer> dataStream = environment.fromCollection(list);
System.out.println("Parallelism is >>>> " + dataStream.getParallelism());
// lambda表达式实现函数参数 FilterFunction<T>
dataStream.filter(t -> t % 2 == 0)
.writeAsText("D:/API/FlatMap", FileSystem.WriteMode.OVERWRITE);
// print 和 writeAsText都是sink终端类算子,只能有一个,
//.print();
environment.execute();
}
04 键分区
这里比较注意对象比较:DataStreamSource(T) extends SingleOutputStreamOperator(T),而SingleOutputStreamOperator(T) extends DataStream(T),SingleOutputStreamOperator 代表在流上定义的一个指定了输出类型的转换(transformation)操作,DataStreamSource表示DataStream的一个开始元素位置。本实例实现按Vehicle的color属性进行流的逻辑分区:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
Set<Vehicle> vehicleHashSet = new HashSet<>(5);
// java Stream
Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
stream.forEach(vehicleHashSet::add);
/**
* 关系比较:
* DataStreamSource<T> extends SingleOutputStreamOperator<T>
* SingleOutputStreamOperator<T> extends DataStream<T>
*
* The SingleOutputStreamOperator represents a user defined transformation
* applied on a DataStream with one predefined output type.
*
* The DataStreamSource represents the starting point of a DataStream.
*/
DataStreamSource<Vehicle> dataStream = environment.fromCollection(vehicleHashSet);
// KeyedStream即DataStream上的操作状态按key分区了,即对数据流做分类,注意这是逻辑分区
KeyedStream<Vehicle, Tuple> keyedStream = dataStream
.keyBy("color");
keyedStream.writeAsText("D:/API/KeyBy", FileSystem.WriteMode.OVERWRITE);;
environment.execute();
}
注意这个实例中DataStreamSource和keyBy都不能自定义setParallelism,为1,结果如下:
05 归约
本例实现对按color键分区后的流进行weight属性累加,即在每个分区内进行归约计算,注意只有key相同的值才能进行同一个归约:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
Set<Vehicle> vehicleHashSet = new HashSet<>(5);
// java Stream
Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
DataStreamSource<Vehicle> dataStream = environment.fromCollection(vehicleHashSet);
// KeyedStream即DataStream上的操作状态按key分区了,即对数据流做分类,注意这是逻辑分区
KeyedStream<Vehicle, Tuple> keyedStream = dataStream.keyBy("color");
DataStream<Vehicle> dataStream1 = keyedStream
.reduce((value1, value2) -> {value1.setWeight(value1.getWeight()+value2.getWeight()); return value1;} );
dataStream1.print();
dataStream1.writeAsText("D:/API/Reduce", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
结果如下,多个同区元素,会reduce计算多次:
06 fold operator已经@Deprecated,故删除此部分内容。
07 集合
本例实现按type分区后,找出weight最小的元素,注意keyBy返回类型必须为 KeyedStream<T, Tuple>:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
Set<Vehicle> vehicleHashSet = new HashSet<>(5);
// java Stream
Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
DataStreamSource<Vehicle> dataStream = environment.setParallelism(3).fromCollection(vehicleHashSet);
System.out.println("environment.getParallelism >>>> " + environment.getParallelism());
// KeyedStream即DataStream上的操作状态按key分区了,即对数据流做分类,注意这是逻辑分区
// keyBy返回类型必须为 KeyedStream<T, Tuple>
KeyedStream<Vehicle, Tuple> keyedStream = dataStream.keyBy("type");
System.out.println("dataStream.getParallelism >>>> " + dataStream.getParallelism());
// 先按type分区记录,再找到plate最小的元素
DataStream<Vehicle> dataStream1 = keyedStream
// 找到最小的plate值
// .min("weight");
.minBy("weight");
dataStream1.print();
dataStream1.writeAsText("D:/API/Aggregations", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
结果如下:
08 Window窗口
Window是Flink中的核心概念,这里详细描述下。
首先,window窗口每次要框住一组元素,得需有一定的标准吧。比如按元素的时间特征,框住的数量等。元素的时间特征有EventTime/ IngestionTime / ProcessingTime;
-
ProcessingTime处理时间戳:这个是最简单的,直接取执行服务器的系统时间,无服务器间和流之间的协调,故延迟最低。但是在分布式异步环境下,因记录到达各服务器的速度差异,会存在不确定性。 -
EventTime事件时间:是记录产生时的事件时间戳,在进入Flink前记录就已自带。Flink中可以通过API获取记录的EventTime,在使用EventTime的程序中,必须指定Event Time Watermarks,此时间水印是一种进度标签,表示一个时间点之后不会有再晚的EventTime记录了。EventTime的好处就是能保证记录处理有序,无论接收的顺序,按EventTime处理的窗口,会等待所有满足EventTime条件的记录,直到超时。但缺点就是,这种等待历史或无序记录到达的过程,就有延迟了,所以高实时要求的时候,可选择ProcessingTime。 -
IngestionTime进入时间。是记录进入Flink的时间戳,在source节点的operator处理时取source的当前时间。IngestionTime概念上介于EventTime和ProcessingTime之间。IngestionTime的优势是能保证时间戳的稳定性,记录在source处分配IngestionTime后,后续各window都将使用相同的时间戳。相比EventTime,就不能保证无序记录的处理顺序了,但同时也不需要watermark了。在内部使用上,IngestionTime被近似当作EventTime来处理,只是加了时间戳和水印的自动生成。
其次,有了时间戳后,每个执行环境可先设置运行环境的时间特征处理模式,三选一,缺省为ProcessingTime:
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
window有多种:一是按时间,有TumblingWindow和SlidingWindows,前者是翻转式,window连续翻动,不跳跃记录,在框内的就是一组,后者是滑动式,按照窗口时间和滑动时间定义,可以跳跃前进。以下为两者示意:
二是按数量,类似的,也有翻转和滑动,只是以记录数量为标准。三是自定义,自定义WindowAssigner窗口分配器。因时间和数量窗口最常用,Flink提供了keyedStream.countWindow()和keyedStream.timeWindow()。还有其他session window,global window,此处不述。
最后,是使用window或windowAll对将DataStream转为WindowedStream,再对每个window进行如apply/reduce/fold转换。
完整代码如下:本例使用时间窗口,并使用AssignerWithPunctuatedWatermarks来进行event time timestamps赋值,并生成标识流event time处理进度的低位水印(low watermarks):
public class T8_WindowTest1 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行环境的时间特征,可以三选一,缺省为ProcessingTime:
// environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// cause all operators (such as map,batchReduce) to run with x parallel instances
environment.setParallelism(2);
Set<Vehicle> vehicleHashSet = new HashSet<>(7);
// java Stream
Stream<Vehicle> stream = new Producer().getVehicles().limit(7L);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
// flink stream
SingleOutputStreamOperator<Vehicle> dataStream = environment.fromCollection(vehicleHashSet)
// 这里特别注意:对记录加eventTime时间戳,因为这里是从collection自建的流,没自带时间戳,如果从kafka取,一般会自带
// 否则报错:java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker)
.assignTimestampsAndWatermarks(new MyTimeAssigner());
// Sets the parallelism for this operator.但此实例流无法使用并行,请看官君思考下why
// dataStream.setParallelism(3);
// KeyedStream即DataStream上的操作状态按key分区了(即对数据流做分类),注意这是逻辑分区
// 先按type分区流的所有记录
KeyedStream<Vehicle, Tuple> keyedStream = dataStream.keyBy("type");
// 建立按记录数的WindowedStream,以下为窗口内元素数量为5,窗口每次滑过10个元素,
/* windowStream = keyedStream.countWindow(5,10); */
// 建立按时间特征划分的WindowedStream,
WindowedStream<Vehicle, Tuple, TimeWindow> windowStream = keyedStream
// 以下建立 翻转模式窗口,窗口时间跨度1秒
// .window(TumblingEventTimeWindows.of(Time.seconds(1));
// 以下建立 滑动模式窗口,窗口时长500,每次滑动时长1000
.window(SlidingEventTimeWindows.of(Time.milliseconds(500),Time.milliseconds(1000)));
// 可以在window内reduce/fold/aggregations,这里是将一个window内的全部元素放入到一个list中
SingleOutputStreamOperator<Object> dataStream1 = windowStream
.apply(new WindowFunction<Vehicle, Object, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Vehicle> input, Collector<Object> out) throws Exception {
List<Vehicle> vehicles = new ArrayList<>(8);
for (Vehicle v : input
) {
vehicles.add(v);
}
out.collect(vehicles);
}
});
dataStream1.print();
dataStream1.writeAsText("D:/API/window", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
// 自定义时间戳Timestamp和时间watermark水印分配器:给元素分配event time timestamps,并
// 生成标识流event time处理进度的低位水印(low watermarks)
// 水印:是一个eventTime标识,watermarks之后只有eventTime大于此水印时间戳的元素产生
// AssignerWithPeriodicWatermarks<T> generate watermarks in a periodical interval
// AssignerWithPunctuatedWatermarks<T> emitted only if it is non-null and its timestamp
// is larger than that of the previously emitted watermark
static class MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle>{
// emit a watermark. called right after extractTimestamp(Object, long)} method.
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
return null;
}
// Assigns a timestamp to an element
@Override
public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
return element.getSequenceTimestamp();
}
}
}
结果1:
本例使用数量窗口:
public class T9_WindowTest2 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(2);
Set<Vehicle> vehicleHashSet = new HashSet<>(7);
Stream<Vehicle> stream = new Producer().getVehicles().limit(7);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
// flink stream, 这里使用chain语法,非常简洁,
// 按type逻辑分区,并统计weight值,
environment.fromCollection(vehicleHashSet)
// 这里使用count特征窗口,未使用时间特征,可不加时间戳
// .assignTimestampsAndWatermarks(new MyTimeAssigner())
.keyBy("type")
// 建立按记录数特征的WindowedStream,以下为窗口内元素数量为2,窗口每次滑过2个元素,
.countWindow(2,2)
.apply(new WindowFunction<Vehicle, Object, Tuple, GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Vehicle> input, Collector<Object> out) {
Tuple2<String,Float> total = new Tuple2<>();
total.f1 = 0.0f;
for (Vehicle v : input
) {
total.f0 = v.getType();
total.f1 += v.getWeight();
}
out.collect(total);
}
})
.writeAsText("D:/API/window02", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
}
注意这个代码中窗口数量为2,结果:
09 合并
本例实现奇数流和偶数流合并,为了观察结果,我设置并行度为1:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(1);
List<Integer> list1 = Arrays.asList(1,3,5,7,9);
List<Integer> list2 = Arrays.asList(2,4,6,8,10);
DataStream<Integer> dataStream1 = environment.fromCollection(list1);
DataStream<Integer> dataStream2 = environment.fromCollection(list2);
dataStream1.union(dataStream2)
.writeAsText("D:/API/UnionTest/union", FileSystem.WriteMode.OVERWRITE);
dataStream1.union(dataStream1)
.writeAsText("D:/API/UnionTest/union2", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
我运行多次,union的结果有时候是奇数在上,有时在下,union2的结果保持不变,结果如下:
10 Window连接
1.JoinedStreams.where().equalTo().window()是一套流程,不能单独使用。where和equalTo的参数分别指定第一个和第二个输入的KeySelector,必须配合使用
2.JoinFunction是核心,会应用到每一对joining的元素上 。
tumbling window join原理图:
Sliding Window Join原理图:
本例使用eventTime特征,同时也要实现AssignerWithPunctuatedWatermarks接口,也可使用processingTime:
public class T11_WindowJoin {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(2);
System.out.println("vehicle stream 1 >>>");
Set<Vehicle> vehicleHashSet = new HashSet<>(8);
Stream<Vehicle> stream = new Producer().getVehicles().limit(8L);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
System.out.println("vehicle stream 2 >>>");
List<Vehicle> VehicleList = new ArrayList<>(8);
Stream<Vehicle> stream2 = new Producer().getVehicles().limit(8L);
stream2.forEach(VehicleList::add);
VehicleList.stream().forEach(System.out::println);
// flink stream
DataStream<Vehicle> dataStream1 = environment.fromCollection(vehicleHashSet)
.assignTimestampsAndWatermarks(new MyTimeAssigner());
DataStream<Vehicle> dataStream2 = environment.fromCollection(VehicleList)
.assignTimestampsAndWatermarks(new MyTimeAssigner());
// JoinedStreams 可按照SQL的 inner join理解, DB是从两个table范围内取,
// 这里是在一个共用的window中,将两个流按笛卡尔积取元素对(想象两个足球队入场握手)
dataStream1.join(dataStream2)
// KeySelector指定为按type+color属性比较来自两个流的元素对
// where和equalTo分别指定第一个和第二个输入的KeySelector,必须配对使用
.where((KeySelector<Vehicle, Object>) value -> value.getType())
.equalTo((KeySelector<Vehicle, Object>) value -> value.getType())
.window(TumblingEventTimeWindows.of(Time.of(2, TimeUnit.MILLISECONDS)))
// JoinFunction is called with each pair of joining elements.
// 对符合条件的元素对的操作,我这里将他们放入一个tuple
.apply(new JoinFunction<Vehicle, Vehicle, Object>() {
Tuple2<String,String> tuple2 = new Tuple2<>();
@Override
public Object join(Vehicle first, Vehicle second) {
tuple2.f0 = "e1: "+ first;
tuple2.f1 = "e2: "+ second;
return tuple2;
}
})
.writeAsText("D:/API/WindowJoin", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
// 对元素添加 event time 时间戳
static class MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle> {
// emit a watermark. called right after extractTimestamp(Object, long)} method.
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
return null;
}
// Assigns a timestamp to an element
@Override
public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
return element.getSequenceTimestamp();
}
}
}
实际就是对两组数据做笛卡尔积,且满足key相同的,如pickup类别,是2*3,结果如下:
11 Interval Join
一个流的每个元素应用时间间隔条件扫描另一个流的每个元素,判断是否满足时间区域条件。IntervalJoin.between().process()需配套使用!可指定是否包含上下界点。interval join原理图:
public class T12_IntervalJoin {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置timestamp按照EventTime来处理
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(3);
System.out.println("vehicle stream 1 >>>");
Set<Vehicle> vehicleHashSet = new HashSet<>(5);
Stream<Vehicle> stream = new Producer().getVehicles().limit(5L);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
System.out.println("vehicle stream 2 >>>");
List<Vehicle> VehicleList = new ArrayList<>(5);
Stream<Vehicle> stream2 = new Producer().getVehicles().limit(5L);
stream2.forEach(VehicleList::add);
VehicleList.stream().forEach(System.out::println);
// flink stream
KeyedStream<Vehicle,String> KeyedDataStream1 = environment.fromCollection(vehicleHashSet)
.assignTimestampsAndWatermarks(new MyTimeAssigner())
.keyBy(Vehicle::getColor);
KeyedStream<Vehicle,String> KeyedDataStream2 = environment.fromCollection(VehicleList)
.assignTimestampsAndWatermarks(new MyTimeAssigner())
.keyBy(Vehicle::getColor);
// intervalJoin:对于来自于两个流的元素e1和e2,使其满足
// e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
// Time-bounded stream joins are only supported in event time
KeyedDataStream1.intervalJoin(KeyedDataStream2)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive()
.lowerBoundExclusive() // optional
.process(new ProcessJoinFunction<Vehicle, Vehicle, Object>() {
Tuple2<String,String> tuple2 = new Tuple2<>();
@Override
public void processElement(Vehicle left, Vehicle right, Context ctx, Collector<Object> out) {
tuple2.f0 = "e1->"+left.toString() + left.getSequenceTimestamp();
tuple2.f1 = "e2->"+right.toString() + right.getSequenceTimestamp();
out.collect(tuple2);
}
})
.writeAsText("D:/API/IntervalJoin", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
// 对元素添加 event time 时间戳
static class MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle> {
// emit a watermark. called right after extractTimestamp(Object, long)} method.
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
return null;
}
// Assigns a timestamp to an element
@Override
public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
return element.getSequenceTimestamp();
}
}
}
顺带打印出来了eventTime,结果如下:
12 CoGroup
public class T13_CoGroup {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置timestamp按照IngestionTime来处理
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(2);
System.out.println("vehicle stream 1 >>>");
Set<Vehicle> vehicleHashSet = new HashSet<>(8);
Stream<Vehicle> stream = new Producer().getVehicles().limit(8L);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
System.out.println("vehicle stream 2 >>>");
List<Vehicle> VehicleList = new ArrayList<>(8);
Stream<Vehicle> stream2 = new Producer().getVehicles().limit(8L);
stream2.forEach(VehicleList::add);
VehicleList.stream().forEach(System.out::println);
// flink stream
DataStream<Vehicle> dataStream1 = environment.fromCollection(vehicleHashSet)
.assignTimestampsAndWatermarks(new MyTimeAssigner());
DataStream<Vehicle> dataStream2 = environment.fromCollection(VehicleList)
.assignTimestampsAndWatermarks(new MyTimeAssigner());
// coGroup 与 join的区别:其他一样,只是apply参数不同,join是两个泛型,coGroup是两个迭代集,
// join比较的是笛卡尔积元素对,coGroup直接操作两个迭代集,更为自由,比如可做排序,而join不可
// join是coGroup的特例,一般建议使用coGroup
dataStream1.coGroup(dataStream2)
// KeySelector指定为按type+color属性比较来自两个流的元素对
.where((KeySelector<Vehicle, Object>) value -> value.getType())
.equalTo((KeySelector<Vehicle, Object>) Vehicle::getType)
.window(TumblingEventTimeWindows.of(Time.of(2, TimeUnit.SECONDS)))
// 我这里直接将两个集做m*n输出
.apply(new CoGroupFunction<Vehicle, Vehicle, Object>() {
@Override
public void coGroup(Iterable<Vehicle> first, Iterable<Vehicle> second, Collector<Object> out) {
Tuple1<String> tuple1 = new Tuple1<>();
first.forEach( x ->
second.forEach( y ->
{tuple1.f0 = x.toString() +" / "+ y.toString();
out.collect(tuple1);
}
)
);
}
})
.writeAsText("D:/API/CoGroup", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
// 对元素添加 event time 时间戳
static class MyTimeAssigner implements AssignerWithPunctuatedWatermarks<Vehicle> {
// emit a watermark. called right after extractTimestamp(Object, long)} method.
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Vehicle lastElement, long extractedTimestamp) {
return null;
}
// Assigns a timestamp to an element
@Override
public long extractTimestamp(Vehicle element, long previousElementTimestamp) {
return element.getSequenceTimestamp();
}
}
}
结果贴图类似join,略!
13 连接和map
connect比 coGroup/join 更为自由,没有做key预匹配,比如外部定义共同的key,再分别与两个流的元素单独匹配,即可等效于coGroup/join。此operator分别对来自两个流的元素做操作,并保留流的元素类型。
流connect算子使用场景例:流A中有一个规则(rule)集,这个规则集会随着流B中的元素变化。流A中的一个规则集先存储为一个状态并等待流B中新元素的到来,接收后,就将之前保存的规则集应用到新元素上来产生一个结果,或(同时)注册一个新的定时器在将来触发新行为。代码中可以看到,process函数中都是分别对element1和element2分别操作,因为这两个操作是同一函数的两个实例,故两者间是共享的,并且可以通过Context来注册EventTime及ProcessTime的定时器timer,当watermark 越过这些timer的时候,调用回调函数执行一定的操作。
本例中,对连接后的ConnectedStreams进行了CoFlatMapFunction 和 CoMapFunction操作,功能上类比于FlatMap和Map,map函数返回Object,而 flatMap返回是void类型,flatMap返回值都在Collector中,可容纳多个值,所以能进行flat扁平化操作。需注意的的是ConnectedStreams也可以进行key逻辑分区,并且特别注意两个KeySelector<Integer, *>的第二参数要type一样。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置timestamp按照IngestionTime来处理
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(2);
System.out.println("vehicle stream 1 >>>");
List<Integer> list1 = Arrays.asList(1,2,3,4,5);
DataStream<Integer> dataStream1 = environment.fromCollection(list1);
list1.stream().forEach(System.out::println);
System.out.println("vehicle stream 2 >>>");
List<Vehicle> VehicleList = new ArrayList<>(5);
Stream<Vehicle> stream2 = new Producer().getVehicles().limit(5L);
stream2.forEach(VehicleList::add);
VehicleList.stream().forEach(System.out::println);
// flink stream
DataStream<Vehicle> dataStream2 = environment.fromCollection(VehicleList);
/** connect比 coGroup/join 更为自由,没有key做预匹配,
可以外部定义共同的key,再分别与两个流的元素单独匹配,即可等效于coGroup/join
分别对来自两个流的元素做操作,并保留流的元素类型 */
/**流connect算子使用场景例:流A中有一个规则(rule)集,这个规则集会随着流B中的元素变化。流A中的一个规则集先存储为一个状态并
等待流B中新元素的到来,接收后,就将之前保存的规则集应用到新元素上来产生一个结果,或(同时)注册一个新的计时器在将来触发新行为。*/
dataStream1.connect(dataStream2)
/**ConnectedStreams也可以进行key逻辑分区,并且特别注意两个KeySelector<Integer, *>的第二参数要type一样,否则
* 报错:Key types if input KeyedStreams don't match*/
.keyBy((KeySelector<Integer, Object>) String::valueOf, (KeySelector<Vehicle, Object>) Vehicle::getType)
.process(new CoProcessFunction<Integer, Vehicle, Object>() {
@Override
public void processElement1(Integer value, Context ctx, Collector<Object> out) {
// set timers, When reacting to the firing of set timers the function can emit yet more elements.
// Setting timers is only supported on a keyed streams.
ctx.timerService().registerProcessingTimeTimer(11245L);
ctx.timerService().registerEventTimeTimer(145142L);
if (value % 2 == 0){
// 这里的timestamp都是null,请看官君思考下,参考T13_CoGroup
out.collect("e1: " + value + " timestamp: "+ ctx.timestamp());
}
}
@Override
public void processElement2(Vehicle value, Context ctx, Collector<Object> out) {
// query the time (both event and processing)
Long timestamp1 = ctx.timerService().currentProcessingTime();
Long timestamp2 = ctx.timestamp();
if ( Objects.equals("car",value.getType())){
out.collect("e2: "+ value+ " ProcessingTime: "+ timestamp1 + " timestamp: "+ timestamp2);
}
}
})
.writeAsText("D:/API/Connect1", FileSystem.WriteMode.OVERWRITE);
// 以下代码展示process的另一函数参数:KeyedCoProcessFunction,区别CoProcessFunction就是有无需要先keyby
dataStream1.connect(dataStream2)
/**ConnectedStreams也可以进行key逻辑分区,并且特别注意两个KeySelector<Integer, *>的第二参数要type一样,
* 我这里都转为String*/
.keyBy((KeySelector<Integer, String>) String::valueOf, (KeySelector<Vehicle, String>) Vehicle::getType)
// KeyedCoProcessFunction processes elements of two keyed streams and produces a single output one
.process(new KeyedCoProcessFunction<Object, Integer, Vehicle, Object>() {
@Override
public void processElement1(Integer value, Context ctx, Collector<Object> out) {
}
@Override
public void processElement2(Vehicle value, Context ctx, Collector<Object> out) {
}
})
.writeAsText("D:/API/Connect2", FileSystem.WriteMode.OVERWRITE);
/** CoMapFunction 在两个connected streams上实现了同一个map()转换
* 因使用同一个函数实例,故这两个流转换能共享状态*/
dataStream1.connect(dataStream2)
.map(new CoMapFunction<Integer, Vehicle, Object>() {
// 注意这里不能有返回null值,否则NPE
@Override
public Object map1(Integer value) {
return value * 2;
}
// 注意这里不能有返回null值,否则NPE
@Override
public Object map2(Vehicle value) {
if (Objects.equals("car",value.getType())){
return "car --> suv :" + value;
}
return value;
}
})
.writeAsText("D:/API/Connect3", FileSystem.WriteMode.OVERWRITE);
// CoFlatMapFunction 和 CoMapFunction,类比于FlatMap和Map,map函数返回Object
// flatMap返回是void类型,返回值都在Collector中,可容纳多个值,所以能进行flat扁平化操作
dataStream1.connect(dataStream2)
.flatMap(new CoFlatMapFunction<Integer, Vehicle, Object>() {
@Override
public void flatMap1(Integer value, Collector<Object> out) {
out.collect(value * 2);
}
@Override
public void flatMap2(Vehicle value, Collector<Object> out) {
for (String str : value.toString().split(",")) {
out.collect(str);
}
}
})
.writeAsText("D:/API/Connect4", FileSystem.WriteMode.OVERWRITE);;
environment.execute();
}
结果如下:
14 分割和选取
这个算子相对简单,略!
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(1);
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
DataStream<Integer> dataStream = environment.fromCollection(list);
// SplitStream<Integer> 被 @Deprecated 标注
SplitStream<Integer> splitStream = dataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> list1 = new ArrayList<>(16);
if (value % 2 == 0){
list1.add("even");
}else {
list1.add("odd");
}
return list1;
}
});
// 原流虽然被分割,但本身打印出来还是原流
splitStream.writeAsText("D:/API/Split", FileSystem.WriteMode.OVERWRITE);
DataStream<Integer> even = splitStream.select("even");
even.writeAsText("D:/API/Split/even", FileSystem.WriteMode.OVERWRITE);
DataStream<Integer> odd = splitStream.select("odd");
odd.writeAsText("D:/API/Split/odd", FileSystem.WriteMode.OVERWRITE);
DataStream<Integer> all = splitStream.select("even","odd");
all.writeAsText("D:/API/Split/all", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
结果贴图,略!
15 迭代
迭代流的通常使用场景:对输出的结果做流分离,对部分筛选出来的数据送入迭代流中反复迭代计算,直到满足条件;
本例中定义一个1-5的流,每个元素的迭代处理就是加上100,如果迭代后的值还是小于300,则放回流,直到满足大于300才输出;IterativeStream即表示DataStream的迭代开始点。代码中迭代停止条件为closeWith(DataStream< T >) 满足feedback流过滤条件的放回。如果iterationBody.filter如果无法stop,则会无限迭代下去!
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置环境全局并行数,将作用于所有operator
environment.setParallelism(1);
List<Integer> list = Arrays.asList(1,2,3,4,5);
DataStream<Integer> dataStream = environment.fromCollection(list);
// 迭代流的通常使用场景:对输出的结果流做流分离,对部分筛选出来的数据送入迭代流中反复迭代计算,直到满足条件;
// 这里定义一个1-5的流,每个元素的迭代处理就是加上100,如果迭代后的值还是小于300,则放回流,直到满足大于300才输出;
// The iterative data stream represents the start of an iteration in a DataStream
IterativeStream<Integer> iterativeStream = dataStream.iterate(2000L);
// 迭代算法定义
DataStream<Integer> iterationBody = iterativeStream.map(value -> value + 100);
iterationBody.writeAsText("D:/API/Iterate/iterationBody", FileSystem.WriteMode.OVERWRITE);
DataStream<Integer> feedback = iterationBody.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) {
return value < 300;
// 改为如下则无限迭代下去,但只迭代偶数元素
//return value % 2 == 0;
}
});
// 语法:closeWith(DataStream<T>) 满足feedback流过滤条件的放回
iterativeStream.closeWith(feedback);
iterativeStream.writeAsText("D:/API/Iterate/iterativeStream", FileSystem.WriteMode.OVERWRITE);
DataStream<Integer> output = iterationBody.filter(value -> value >= 300);
output.writeAsText("D:/API/Iterate/output", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
结果如下:
16 Tuple类型流转换操作
这个operator只有一点:仅适用于Tuple类型流!
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
environment.setParallelism(1);
// 创建只有一个元素的Tuple流
DataStream<Tuple4<String,Integer,String,Float>> fromStream = environment.fromElements(new Tuple4<>("car",123456,"yellow",1.5f));
// 流投影,取原始流的元素的部分属性,只适用于Tuple类型的流,
// 语法 project(int... fieldIndexes)
DataStream<Tuple2<Float,String>> toStream = fromStream.project(3,0);
toStream.writeAsText("D:/API/Project", FileSystem.WriteMode.OVERWRITE);
environment.execute();
}
结果略!
17 物理分区转换操作
对流转换后进行低级物理分区,在此之前,比如按key分区都是指逻辑分区,物理分区,将数据传入不同的物理分区。
其中的轮询分发,适用场景:一个管道扇出到多个下游算子,此分区法不会触发整体的rebalance(),只会对局部数据分区,算法和上下游算子并行度有关, 比如上游并行度为2,下游为4,那么上游中一个就固定分发给下游中的两个,即1对2, 如果上游并行度为6,下游为2,那么上游中三个就固定分发给下游中的1个,即3对1。如果非倍数关系,各对应数就会不同。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
environment.setParallelism(1);
// 创建只有一个元素的Tuple流
DataStream<Tuple4<String,Integer,String,Float>> tupleStream = environment
.fromElements(new Tuple4<>("car",123456,"yellow",1.5f));
// 语法 partitionCustom(Partitioner<K> partitioner, String field)
// Partitioner:根据总的分区数numPartitions计算一个key被分到的分区的下标
DataStream<Tuple4<String,Integer,String,Float>> partitionStream1 = tupleStream
// 这里使用取余算法,计算一个key的目标分区下标
.partitionCustom((Partitioner<String>) (key, numPartitions) -> Integer.parseInt(key) % numPartitions,1);
partitionStream1.writeAsText("D:/API/Physicalpartitioning", FileSystem.WriteMode.OVERWRITE);
System.out.println("vehicle stream data >>>");
Set<Vehicle> vehicleHashSet = new HashSet<>(10);
Stream<Vehicle> stream = new Producer().getVehicles().limit(10);
stream.forEach(vehicleHashSet::add);
vehicleHashSet.stream().forEach(System.out::println);
DataStream<Vehicle> vehicleDataStream = environment.fromCollection(vehicleHashSet);
// 随机分区法,数据均匀地分配到各分区
vehicleDataStream.shuffle();
// 轮询算法分区
vehicleDataStream.rebalance();
/** 轮询分发,适用场景:一个管道扇出到多个下游算子,此分区法不会触发整体的rebalance(),
只会对局部数据分区,算法和上下游算子并行度有关,*/
// 比如上游并行度为2,下游为4,那么上游中一个就固定分发给下游中的两个,即1对2,
// 如果上游并行度为6,下游为2,那么上游中三个就固定分发给下游中的1个,即3对1,
// 如果非倍数关系,各对应数就会不同
vehicleDataStream.rescale();
// 广播到每个分区,即数据复制多份
vehicleDataStream.broadcast();
environment.execute();
}
贴图略!
18 还有其他一些操作,相对好理解:
总结:Flink内容非常多,我这里不能面面俱到,其有流计算做核心功能,也有如批处理的DataSet,还有与Table SQL相关的Blink,功能非常强大,我这里仅对Stream API核心做了演示,以期抛砖引玉。
问题:1.并行度设置后没起效果,请删除前次的运行结果,并注意对比文件的生成时间。
2.每次运行结果都不一样,这是因为我代码里的车流实现是全随机产生的,所以每次结果不同完全正常!
此篇完!
推荐阅读:
以上是关于流式计算-Flink Stream API 篇二的主要内容,如果未能解决你的问题,请参考以下文章