flink算子

Posted 大数据最后一公里

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink算子相关的知识,希望对你有一定的参考价值。

版本说明

flink-1.12

算子

用户通过算子能将一个或多个DataStream转换成新的DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据拓扑

数据流转换

Map

DataStream → DataStream

取一个元素并产生一个元素。

dataStream.mapx=>x*2

FlatMap

DataStream → DataStream

取一个元素并产生零个、一个或多个元素。

dataStream.flatMapstr=>str.split(" ")

Filter

DataStream → DataStream

为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数

dataStream.filter_ != 0

KeyBy

DataStream → KeyedStream

在逻辑上将流划分为不相交的分区,每个分区包含相同键的元素。在内部,这是通过哈希分区实现的

dataStream.keyBy(_.somekey) //对象类的
dataStream.keyBy(_._1)//元组类的

Reduce

KeyedStream → DataStream

键控数据流上的“滚动”减少。将当前元素与最后一个减少的值合并,并发出新值。

keyedStream.reduce_+_

Aggregations

KeyedStream → DataStream

在键控数据流上滚动聚合。min 和 minBy 之间的区别在于 min 返回最小值,而 minBy 返回该字段中具有最小值的元素(max 和 maxBy 相同)。

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

Window

KeyedStream → WindowedStream

可以在已经分区的 KeyedStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对每个键中的数据进行分组。

dataStream.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5)))

WindowAll

DataStream → AllWindowedStream

可以在常规数据流上定义 Windows。Windows 根据某些特征(例如,在过去 5 秒内到达的数据)对所有流事件进行分组

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

Window Apply

WindowedStream → DataStream
AllWindowedStream → DataStream

将通用函数应用于整个窗口。下面是一个手动对窗口元素求和的函数。

windowedStream.apply  WindowFunction 
allWindowedStream.apply  AllWindowFunction 

Window Reduce

WindowedStream → DataStream

对窗口应用函数式缩减函数并返回缩减后的值。

windowedStream.reduce_+_

Aggregations on windows

WindowedStream → DataStream

聚合窗口的内容。min 和 minBy 之间的区别在于 min 返回最小值,而 minBy 返回该字段中具有最小值的元素(max 和 maxBy 相同)。

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")

Union

DataStream* → DataStream

两个或多个数据流的联合创建一个包含所有流中所有元素的新流。注意:如果您将数据流与自身联合,您将在结果流中获得每个元素两次。

dataStream.union(otherStream1,otherStream2,...)

Window Join

DataStream,DataStream → DataStream

在给定的键和公共窗口上连接两个数据流。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply  ... 

Window CoGroup

DataStream,DataStream → DataStream

将给定键和公共窗口上的两个数据流组合在一起。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply 

Connect

DataStream,DataStream → ConnectedStreams

“连接”两个保留其类型的数据流,允许在两个流之间共享状态。

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)

CoMap, CoFlatMap

ConnectedStreams → DataStream

类似于连接数据流上的 map 和 flatMap

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)

Iterate

DataStream → IterativeStream → DataStream

通过将一个运算符的输出重定向到某个先前的运算符,在流中创建一个“反馈”循环。这对于定义持续更新模型的算法特别有用。下面的代码从一个流开始,并不断地应用迭代体。大于 0 的元素被发送回反馈通道,其余元素被转发到下游。

initialStream.iterate 
  iteration => 
    val iterationBody = iteration.map /*do something*/
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  

物理分区

Flink 也提供以下方法让用户根据需要在数据转换完成后对数据细节进行更细粒度的。

Custom partitioning

使用用户定义的分区程序为每个元素选择目标任务。

dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)

Random partitioning

根据均匀分布随机划分元素。

dataStream.shuffle()

Rebalancing (Round-robin partitioning)

分区元素循环,每个分区创建相等的负载。对存在数据倾斜时的性能优化很有用。

dataStream.rebalance()

Rescaling

分区元素循环,每个分区创建相等的负载。对存在数据倾斜时的性能优化很有用。

dataStream.rescale()

Broadcasting

将元素以循环方式分区到下游操作的子集。如果您希望拥有管道,例如,在其中将源的每个并行实例扇出到多个映射器的子集以分配负载,但又不希望 rebalance() 引起的完全重新平衡,这将非常有用。这将只需要本地数据传输而不是通过网络传输数据,这取决于其他配置值,例如 TaskManager 的插槽数。

上游操作将元素发送到的下游操作子集取决于上游和下游操作的并行度。例如,如果上游操作的并行度为 2,下游操作的并行度为 4,那么一个上游操作会将元素分发给两个下游操作,而另一个上游操作将分发给其他两个下游操作。另一方面,如果下游操作具有并行性2,而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。

在不同并行度不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。

dataStream.broadcast()

算子链和资源组

将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:

如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注 意的是, 这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(...).startNewChain() 这样调用,而不能 someStream.startNewChain()这样。

一个资源组对应着 Flink 中的一个 slot 槽, 你可以根据需要手动地将各个算子隔离到不同的 slot 中。

Start new chain

以当前 operator 为起点开始新的连接。如下的两个 mapper 算子会链接在一起而 filter 算子则不会和第一个 mapper 算子进行链接。

someStream.filter(...).map(...).startNewChain().map(...)

Disable chaining

任何算子不能和当前算子进行链接

someStream.map(...).disableChaining()

Set slot sharing group

配置算子的资源组。Flink 会将相同资源组的算子放置到同一个 slot 槽中执行,并将不同资源组的算子分配到不同的 slot 槽中,从而实现 slot 槽隔离。如果所有输入操作都在同一个资源组, 资源组将从输入算子开始继承。 Flink 默认的资源组名称为 "default",算子可以显式调用 slotSharingGroup("default") 加入到这个资源组中。

someStream.filter(...).slotSharingGroup("name")

以上是关于flink算子的主要内容,如果未能解决你的问题,请参考以下文章

Flink 基本算子mapkeyBysum

Flink-算子(1)——DataSet

实时即未来,大数据项目车联网之创建Flink实时计算子工程

实时即未来,大数据项目车联网之创建Flink实时计算子工程

08-flink-1.10.1- flink Transform api 转换算子

08-flink-1.10.1- flink Transform api 转换算子