Flink 流处理 API_Transform
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 流处理 API_Transform相关的知识,希望对你有一定的参考价值。
Transform
转换算子
1 map
val streamMap = stream.map x => x * 2
2 flatMap
flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A List[B]): List[B] 例如: flatMap(List(1,2,3))(i List(i,i)) 结果是 List(1,1,2,2,3,3), 而 List("a b", "c d").flatMap(line line.split(" ")) 结果是 List(a, b, c, d)。
val streamFlatMap = stream.flatMap x => x.split(" ")
3 Filter
val streamFilter = stream.filter x => x == 1
4 KeyBy
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
5 滚动聚合算子(Rolling Aggregation)
这些算子可以针对 KeyedStream 的每一个支流做聚合。
- sum()
- min()
- max()
- minBy() l maxBy()
6 Reduce
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素
和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果
// 读入数据 val inputStream = env.readTextFile("F:\\\\IDEA-DATA\\\\Flink_Demo\\\\source.txt") // Transform操作 val dataStream = inputStream .map( data => val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ) )
7 Split 和 Select
DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者
多个 DataStream。
Select
SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个 DataStream。
需求:传感器数据按照温度高低(以 30 度为界),拆分成两个流。
val splitStream = stream2 .split( sensorData => if (sensorData.temperature > 30) Seq("high") else Seq("low") ) val high = splitStream.select("high") val low = splitStream.select("low") val all = splitStream.select("high", "low")
8 Connect 和 CoMap
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据
流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap
ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。
val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) ) val connectedStreams = warningStream.connect(lowTempStream) val coMapStream = connectedStreams.map( warningData => ( warningData._1, warningData._2, "high temperature warning" ), lowData => ( lowData.id, "healthy" ) )
9. Union
DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream) unionStream.print("union:::")
Connect 与 Union 区别:
1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。
2. Connect 只能操作两个流,Union 可以操作多个。
以上是关于Flink 流处理 API_Transform的主要内容,如果未能解决你的问题,请参考以下文章
云原生 API 处理平台「支流科技」完成百万美元 Pre-A 轮融资