Flink 多流转换算子

Posted noyouth

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 多流转换算子相关的知识,希望对你有一定的参考价值。

1.拆分流

代码片段:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")

val stream1: DataStream[SensorReading] = streamFromFile.map(d => {
  val arr = d.split(",")
  SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})

//拆分流
val splitStream: SplitStream[SensorReading] = stream1.split(data => if (data.temperature > 30) Seq("high") else Seq("low"))
val highStream = splitStream.select("high")
val lowStream = splitStream.select("low")
val allStream = splitStream.select("high", "low")

2.collect合并流

collect合并的两个流,数据类型可以不一样。

代码片段:

//合并流
val warning: DataStream[(String, Double)] = highStream.map(data => (data.id, data.temperature))
//connect()方法合并流
val connectedStream: ConnectedStreams[(String, Double), SensorReading] = warning.connect(lowStream)
//合并流的map操作
val coMapStream: DataStream[Product] = connectedStream.map(
  data1 => (data1._1, data1._2, "high temperature warning"),
  data2 => (data2.id, "healthy")
)

3.union合并流

union合并操作可以传多个流参数,并且因为返回类型为DataStream,支持链式操作。

代码片段:

val unionStream: DataStream[SensorReading] = highStream.union(lowStream,highStream).union(lowStream)

  

以上是关于Flink 多流转换算子的主要内容,如果未能解决你的问题,请参考以下文章

FLink四种图 以及 数据在 taskManager 之间的流转

08-flink-1.10.1- flink Transform api 转换算子

08-flink-1.10.1- flink Transform api 转换算子

错误 - EXC_BREAKPOINT(代码=1,子代码=0x100308448)

iContact卷曲子代码

创建广告时出现代码 200 和子代码 1487194 的 Facebook 错误背后的原因是啥?