Flink 流处理 API_Transform

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 流处理 API_Transform相关的知识,希望对你有一定的参考价值。


Transform

转换算子

 

1 map

Flink

val streamMap = stream.map x => x * 2

 

 

2 flatMap

flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A  List[B]): List[B] 例如: flatMap(List(1,2,3))(i  List(i,i)) 结果是 List(1,1,2,2,3,3),  而 List("a b", "c d").flatMap(line  line.split(" ")) 结果是 List(a, b, c, d)。

 


val streamFlatMap = stream.flatMap     x => x.split(" ")


 

 

3 Filter

Flink

val streamFilter = stream.filter     x => x == 1

 

4 KeyBy

Flink

DataStream KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

 

 

5 滚动聚合算子(Rolling Aggregation)

这些算子可以针对 KeyedStream 的每一个支流做聚合。

  1. sum()
  2. min()
  3. max()
  4. minBy() l maxBy()

6 Reduce

KeyedStream DataStream:一个分组数据流的聚合操作,合并当前的元素

和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果


// 读入数据 val inputStream = env.readTextFile("F:\\\\IDEA-DATA\\\\Flink_Demo\\\\source.txt") // Transform操作 val dataStream = inputStream .map( data => val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ) )


 

7 Split 和 Select

Flink

 

DataStream SplitStream:根据某些特征把一个 DataStream 拆分成两个或者

多个 DataStream。

Select

Flink

SplitStreamDataStream:从一个 SplitStream 中获取一个或者多个 DataStream。

需求:传感器数据按照温度高低(以 30 度为界),拆分成两个流。

 


val splitStream = stream2   .split( sensorData =>      if (sensorData.temperature > 30) Seq("high") else Seq("low")   )  val high = splitStream.select("high") val low = splitStream.select("low") val all = splitStream.select("high", "low")

 

8 Connect 和 CoMap

Flink

 

DataStream,DataStream ConnectedStreams:连接两个保持他们类型的数据

流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

 CoMap,CoFlatMap

Flink

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。



val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) ) val connectedStreams = warningStream.connect(lowTempStream) val coMapStream = connectedStreams.map( warningData => ( warningData._1, warningData._2, "high temperature warning" ), lowData => ( lowData.id, "healthy" ) )



 

9. Union

Flink

DataStream DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。

 

val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream) unionStream.print("union:::")

 

 

Connect Union 区别:

1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。

2. Connect 只能操作两个流,Union 可以操作多个。

以上是关于Flink 流处理 API_Transform的主要内容,如果未能解决你的问题,请参考以下文章

Flink流处理的时间窗口

云原生 API 处理平台「支流科技」完成百万美元 Pre-A 轮融资

Flink学习之路Flink简介

什么是 Flink (流处理框架)

Flink流处理API大合集:掌握所有flink流处理API技术,看这一篇就够了

Flink流处理API大合集:掌握所有flink流处理API技术,看这一篇就够了