flink算子
Posted 大数据最后一公里
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink算子相关的知识,希望对你有一定的参考价值。
版本说明
flink-1.12
算子
用户通过算子能将一个或多个DataStream转换成新的DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据拓扑
数据流转换
Map
DataStream → DataStream
取一个元素并产生一个元素。
dataStream.mapx=>x*2
FlatMap
DataStream → DataStream
取一个元素并产生零个、一个或多个元素。
dataStream.flatMapstr=>str.split(" ")
Filter
DataStream → DataStream
为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数
dataStream.filter_ != 0
KeyBy
DataStream → KeyedStream
在逻辑上将流划分为不相交的分区,每个分区包含相同键的元素。在内部,这是通过哈希分区实现的
dataStream.keyBy(_.somekey) //对象类的
dataStream.keyBy(_._1)//元组类的
Reduce
KeyedStream → DataStream
键控数据流上的“滚动”减少。将当前元素与最后一个减少的值合并,并发出新值。
keyedStream.reduce_+_
Aggregations
KeyedStream → DataStream
在键控数据流上滚动聚合。min 和 minBy 之间的区别在于 min 返回最小值,而 minBy 返回该字段中具有最小值的元素(max 和 maxBy 相同)。
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream
可以在已经分区的 KeyedStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对每个键中的数据进行分组。
dataStream.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5)))
WindowAll
DataStream → AllWindowedStream
可以在常规数据流上定义 Windows。Windows 根据某些特征(例如,在过去 5 秒内到达的数据)对所有流事件进行分组
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将通用函数应用于整个窗口。下面是一个手动对窗口元素求和的函数。
windowedStream.apply WindowFunction
allWindowedStream.apply AllWindowFunction
Window Reduce
WindowedStream → DataStream
对窗口应用函数式缩减函数并返回缩减后的值。
windowedStream.reduce_+_
Aggregations on windows
WindowedStream → DataStream
聚合窗口的内容。min 和 minBy 之间的区别在于 min 返回最小值,而 minBy 返回该字段中具有最小值的元素(max 和 maxBy 相同)。
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* → DataStream
两个或多个数据流的联合创建一个包含所有流中所有元素的新流。注意:如果您将数据流与自身联合,您将在结果流中获得每个元素两次。
dataStream.union(otherStream1,otherStream2,...)
Window Join
DataStream,DataStream → DataStream
在给定的键和公共窗口上连接两个数据流。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply ...
Window CoGroup
DataStream,DataStream → DataStream
将给定键和公共窗口上的两个数据流组合在一起。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply
Connect
DataStream,DataStream → ConnectedStreams
“连接”两个保留其类型的数据流,允许在两个流之间共享状态。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的 map 和 flatMap
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
Iterate
DataStream → IterativeStream → DataStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建一个“反馈”循环。这对于定义持续更新模型的算法特别有用。下面的代码从一个流开始,并不断地应用迭代体。大于 0 的元素被发送回反馈通道,其余元素被转发到下游。
initialStream.iterate
iteration =>
val iterationBody = iteration.map /*do something*/
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
物理分区
Flink 也提供以下方法让用户根据需要在数据转换完成后对数据细节进行更细粒度的。
Custom partitioning
使用用户定义的分区程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
Random partitioning
根据均匀分布随机划分元素。
dataStream.shuffle()
Rebalancing (Round-robin partitioning)
分区元素循环,每个分区创建相等的负载。对存在数据倾斜时的性能优化很有用。
dataStream.rebalance()
Rescaling
分区元素循环,每个分区创建相等的负载。对存在数据倾斜时的性能优化很有用。
dataStream.rescale()
Broadcasting
将元素以循环方式分区到下游操作的子集。如果您希望拥有管道,例如,在其中将源的每个并行实例扇出到多个映射器的子集以分配负载,但又不希望 rebalance() 引起的完全重新平衡,这将非常有用。这将只需要本地数据传输而不是通过网络传输数据,这取决于其他配置值,例如 TaskManager 的插槽数。
上游操作将元素发送到的下游操作子集取决于上游和下游操作的并行度。例如,如果上游操作的并行度为 2,下游操作的并行度为 4,那么一个上游操作会将元素分发给两个下游操作,而另一个上游操作将分发给其他两个下游操作。另一方面,如果下游操作具有并行性2,而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。
在不同并行度不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。
dataStream.broadcast()
算子链和资源组
将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:
如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注 意的是, 这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(...).startNewChain() 这样调用,而不能 someStream.startNewChain()这样。
一个资源组对应着 Flink 中的一个 slot 槽, 你可以根据需要手动地将各个算子隔离到不同的 slot 中。
Start new chain
以当前 operator 为起点开始新的连接。如下的两个 mapper 算子会链接在一起而 filter 算子则不会和第一个 mapper 算子进行链接。
someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining
任何算子不能和当前算子进行链接
someStream.map(...).disableChaining()
Set slot sharing group
配置算子的资源组。Flink 会将相同资源组的算子放置到同一个 slot 槽中执行,并将不同资源组的算子分配到不同的 slot 槽中,从而实现 slot 槽隔离。如果所有输入操作都在同一个资源组, 资源组将从输入算子开始继承。 Flink 默认的资源组名称为 "default",算子可以显式调用 slotSharingGroup("default") 加入到这个资源组中。
someStream.filter(...).slotSharingGroup("name")
以上是关于flink算子的主要内容,如果未能解决你的问题,请参考以下文章