Flink中的算子操作
Posted ssqq5200936
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中的算子操作相关的知识,希望对你有一定的参考价值。
一、Connect
DataStream,DataStream -> ConnectedStream,连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式
不发生任何变化,两个流相互独立。
import org.apache.flink.streaming.api.scala._ object Connect { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment var stream01 = env.generateSequence(1,10) val stream = env.readTextFile("test001.txt") val stream02 = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop")) val streamConnect: ConnectedStreams[Long, String] = stream01.connect(stream02) //两个流各自处理各自的,互不干扰 val stream03: DataStream[Any] = streamConnect.map(item => item * 2, item => (item,1L)) stream03.print() env.execute("Connect") } }
二、CoMap,CoFlatMap
ConnectedStreams -> DataStream:作用于ConnectedStream上,功能与map和flatMap一样,对ConnectedStram中的每一个Stream分别进行map和flatMap
三、Split
import org.apache.flink.streaming.api.scala._ object Split { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" ")) val streamSplit: SplitStream[String] = stream.split( word => ("hadoop".equals(word) match { case true => List("hadoop") //值等于hadoop的流加入到一个List中 case false => List("other")//值不等于hadoop的流加入到一个List中 }) ) //取出属于各自部分的流 val value01: DataStream[String] = streamSplit.select("hadoop") val value02: DataStream[String] = streamSplit.select("other") value01.print() value02.print() env.execute("Split Job") } }
四、Union
DataStream -> DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新的DataStream。
注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。
五、KeyBy(比较重要)
DataStream -> KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
把所有相同key的数据聚合在一起
import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ object KeyBy { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.readTextFile("test001.txt").flatMap(item => item.split(" ")) //将相同key数据进行聚合 //同一个key的数据都划分到同一个分区中 val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.map(item => (item,1)).keyBy(0) streamKeyBy.print() env.execute("KeyBy Job") } }
六、Reduce
KeyedStream -> DataStream,一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,
而不是只返回最后一次聚合的最终结果。
数据流如何在两个 transformation 组件中传输的?
一对一流(=spark窄依赖):(比如source=>map过程)保持元素分区和排序
redistributing流(=spark宽依赖):(map=>keyBy/window 之间,以及keyBy/window与sink之间)改变了流分区。
每一个算子任务根据所选的转换,向不同的目标子任务发送数据。
比如:keyBy,根据key的hash值重新分区、broadcast、rebalance(类似shuffle过程)。在一次 redistributing交换中,元素间排序,只针对发送方
的partition和接收partition方。最终到sink端的排序是不确定的。
以上是关于Flink中的算子操作的主要内容,如果未能解决你的问题,请参考以下文章