FlinkFlink实验特性--reinterpretAsKeyedStream 将DataStream重新解释为KeyedStream
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink实验特性--reinterpretAsKeyedStream 将DataStream重新解释为KeyedStream相关的知识,希望对你有一定的参考价值。
1.概述
1.1背景
这个实验特性应该是在Flink 1.5版本已经引进,但是直到现在(1.11)仍然是实验特性。官网对于它的描述 :这个特性仍然在不断的优化,目前是可能是不稳定、不兼容的,并且在以后的版本甚至发生大的改变。
1.2 作用
将DataStream重新解释为KeyedStream,这种方式可以避免shuffle
。
那么自然它的使用也会受到相应的约束,这个只能去重新解释那些已经预分区的DataStream。
1.3 官网例子
在源码中找到了这样一个测试代码,结果是:Tests passed
public class ReinterpretAsKeyedStreamDemo
public void reinterpretAsKeyedStream() throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3);
KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>()
@Override
public Integer getKey(Integer value) throws Exception
return value;
);
SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
.reduce(new ReduceFunction<Integer>()
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception
return value1 + value2;
);
reducer.addSink(new PrintSinkFunction<>());
env.execute("xx");
上面结果我们可以看到 输出了2 4 6 其实就是
1 + 1 = 2
2 + 2 = 4
3 + 3 = 6
但是我们其实有9条数据,1,2,3分别是3组数据,为什么少输出呢?
因为前面两组1,2,3已经结束了一个窗口,满足同一个key下有两个数据,然后最后一组的1,2,3,并不满足有两个数据,无法触发窗口。
为了方便理解我们再次修改如下代码:
public void reinterpretAsKeyedStream() throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 1, 2, 3, 1, 2, 3,2);
KeyedStream<Integer, Integer> reinterpret = DataStreamUtils.reinterpretAsKeyedStream(source, new KeySelector<Integer, Integer>()
@Override
public Integer getKey(Integer value) throws Exception
return value;
);
SingleOutputStreamOperator<Integer> reducer = reinterpret.countWindow(2)
.reduce(new ReduceFunction<Integer>()
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception
return value1 + value2;
);
reducer.addSink(new PrintSinkFunction<>());
env.execute("xx");
运行结果如下
2
4
6
4
我们数据源数据里面最后加入了一个2,然后最后输出多了一个4。当然这个是countwindow的使用,因为官网例子给的不明确,这里只是简单给大家补充一下,便于理解,避免初次使用产生太多疑问。
1.3.1 实战Demo分析
代码功能:
-
从文件中读取数据然后构建
ds1:DataStream[Event
]流,然后输出文件数据; -
接着ds1流会根据Event的字段 key 进行keyby操作,使用一个窗口大小为2的CountWindow,然后保留这两条数据中 partition 字段值最大的一条数据,构建数据流
ds2:DataStreamp[Event]
; -
最后我们继续使用一个窗口大小为2的CountWindow,然后对窗口内两条数据处理:
-
如果两条数据的event_type字段值不等,那么我们使用第一条数据的值去创建一个Event对象,然后新数据Event对象的event_type字段设置为3,并且把字段 v 设置为两个数据的字段 v 的字符串拼接;
-
如果event_type字段值相等,那么我们保留time_字段值大的一条数据。
-
第三步中,正常情况我们会对ds2进行keyby然后继续按照key 字段值hash,这样会产生相应的Shuffle,但是通过使用本文的实验特性reinterpretAsKeyedStream,可以避免Shuffle。
// scala
object SessionwindowingOriginal
// 主函数
def main(args: Array[String]): Unit =
Logger.getRootLogger.setLevel(Level.WARN)
val params = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env.getConfig.setGlobalJobParameters(params)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(2)
env.setMaxParallelism(2)
// 从文件读取数据 ds1:DataStream[Event]类型
val ds1 = env.readTextFile("/Users/hehuiyuan/gitwarehouse/flinksql/src/main/resources/f1").map(e =>
val l = e.split(",")
val (key, time_, event_type, v, partition) = (l(0).trim, l(1).trim.toLong, l(2).trim.toInt, l(3).trim, l(4).trim)
Event(key, time_, event_type, v, partition)
).name("f1_source")
//输出原始数据
ds1.addSink(new SinkFunction[Event]
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("原始数据:"+value.toString)
).name("origin_data_sink")
//按照event对象的key字段分组
// 相同key下 窗口大小数据量是2,然后取partition字段取最大的数据
val ds2 = ds1.keyBy(_.key).countWindow(2).max("partition")
// 输出ds2:DataStream[Event]
ds2.addSink(new SinkFunction[Event]
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit = System.out.println("ds2:"+value.toString)
).name("ds2")
//ds2是DataSteam,ds1按照字段key分区处理后得到的流
//此时还想继续使用KededStream的一些操作,需要把ds2进行keyby
// 但是会存在shuffle,key不变情况下,我们可以直接把DataStream变为KeyedStream
val aggregated = new DataStreamUtils(ds2)
.reinterpretAsKeyedStream((event) => event.key)
.countWindow(2)
.reduce((e1, e2) =>
if(e1.event_type != e2.event_type)
Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
else if(e2.time_ > e1.time_) e2
else e1
)
.addSink(new SinkFunction[Event]
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
System.out.println(value.toString)
).name("result")
env.execute()
我们看一下读取的文件中的数据样式:
每一行都会被封装到一个Event对象中,然后构成DataStream。
//创建一个Pojo类,4个字段
case class Event(
key: String,
time_ : Long,
event_type: Int,
v: String,
partition: String
)
Event对象有五个字段,会使用逗号分割
输出结果:
原始数据:Event(a,1,1,banana,0)
原始数据:Event(b,21,2,tomato,0)
原始数据:Event(c,12,1,apple,0)
原始数据:Event(d,10,2,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
原始数据:Event(a,3,1,ba,1)
原始数据:Event(b,11,2,to,0)
原始数据:Event(c,42,1,ap,0)
原始数据:Event(d,20,2,or,0)
原始数据:Event(e,111,2,wa,0)
ds2:Event(a,1,1,banana,1)
ds2:Event(b,21,2,tomato,0)
ds2:Event(c,12,1,apple,0)
ds2:Event(d,10,2,orange,0)
ds2:Event(e,101,1,watermeleon,0)
原始数据:Event(a,2,1,ba,0)
原始数据:Event(b,2,2,to,0)
原始数据:Event(c,88,1,ap,0)
原始数据:Event(d,44,2,or,0)
原始数据:Event(e,11,2,wa,0)
原始数据:Event(a,33,1,banana,0)
原始数据:Event(b,21,2,tomato,1)
原始数据:Event(c,55,2,apple,0)
原始数据:Event(d,66,1,orange,0)
原始数据:Event(e,101,1,watermeleon,0)
ds2:Event(a,2,1,ba,0)
ds2:Event(b,2,2,to,1)
Event(a,2,1,ba,0)
Event(b,21,2,tomato,0)
ds2:Event(c,88,1,ap,0)
Event(c,88,1,ap,0)
ds2:Event(d,44,2,or,0)
Event(d,44,2,or,0)
ds2:Event(e,11,2,wa,0)
Event(e,101,3,wa_watermeleon,0)
我们拿其中一个输出结果的数据简单分析一下:(上面最后一行)
Event(e,101,3,wa_watermeleon,0)
那么这个数据是如何输出的呢?
我们会发现ds1经过keyby以及counwindow后的max处理以后,留下了两条数据:
ds2:Event(e,101,1,watermeleon,0)
ds2:Event(e,11,2,wa,0)
紧接着,把数据流ds2转为keyedStream
,然后又做了一次CountWindow
操作,窗口大小是2,具体实现的代码我们下面分析,这里先把结果分析完:
因为上面对于key = e
下,满足了两条数据,也就是满足了countwindow的触发计算,这个时候会对这两个数据处理,根据我们第三步功能描述可知处理如下:
event_type = 1
event_type = 2
这两条数据的该字段不等,根据(3.1)可知,会创建一个新的Event对象,该对象的 event_type = 3, v = wa_watermeleon(两条数据的该字段的字符串拼接构成)
最终得到如下结果:
Event(e,101,3,wa_watermeleon,0)
最后,我们对ds2使用了本文的主要介绍的特性reinterpretAsKeyedStream进行分析,这个方法在DataStreamUtils中。
使用reinterpretAsKeyedStream的代码:
val aggregated = new DataStreamUtils(ds2)
.reinterpretAsKeyedStream((event) => event.key)
.countWindow(2)
.reduce((e1, e2) =>
if(e1.event_type != e2.event_type)
Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
else if(e2.time_ > e1.time_) e2
else e1
)
.addSink(new SinkFunction[Event]
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
System.out.println(value.toString)
).name("result")
不用reinterpretAsKeyedStream的代码:
val aggregated = ds2
.keyBy(_.key)
.countWindow(2)
.reduce((e1, e2) =>
if(e1.event_type != e2.event_type)
Event(e1.key,e1.time_,3,e2.v+"_"+e1.v,e1.partition)
else if(e2.time_ > e1.time_) e2
else e1
)
.addSink(new SinkFunction[Event]
override def invoke(value: Event, context: SinkFunction.Context[_]): Unit =
System.out.println(value.toString)
).name("result")
在这里就涉及到使用reinterpretAsKeyedStream的优势了,可能代码你无法更好的体会,我们通过StreamGraph来了解这两者的区别:
图片可能有点小,我们把关键地方放大查看:
在这里我们可以发现,同样是Window Operator,但是第一个Window Operator 的数据是通过上游HASH过来的,第二个是通过FORWARD方式过来
。
两个Operator之间的边展示的关键词,其实展示了两个算子之间数据是如何传输的,在之前的文章提到过关于partition的概念以及Flink已经提供的实现,此处阅读 。
2.重点源码分析
public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
DataStream<T> stream,
KeySelector<T, K> keySelector,
TypeInformation<K> typeInfo)
PartitionTransformation<T> partitionTransformation = new PartitionTransformation<>(
stream.getTransformation(),
new ForwardPartitioner<>());
return <以上是关于FlinkFlink实验特性--reinterpretAsKeyedStream 将DataStream重新解释为KeyedStream的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink 1.13 Flink SQL 新特性 性能优化 时区 时间 纠正
FlinkFlink 1.14 版本 新特性 Barrier 在流经算子做 checkpoint Barrier跳过 unaligned checkpoint
FlinkFlink 1.13 将数据 写入 到 elasticsearch 7 案例
FlinkFlink 运行报错 does not have any open files
FlinkFLink SQL TableException: Table sink doesn‘t support consuming update changes which is