flnk算子reduce的简单使用
Posted nohert
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flnk算子reduce的简单使用相关的知识,希望对你有一定的参考价值。
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ object TransformTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //0.从文件中读取数据 val inputPath = "D:\\\\ideaDemo\\\\maven_flink\\\\src\\\\main\\\\resources\\\\sensor.txt"; val inputStream = env.readTextFile(inputPath) //1.先转换成样例类类型(简单转换操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReding(arr(0), arr(1).toLong, arr(2).toDouble) }) //3.需要输出当前最小温度值,以及最近时间戳,要用reduce val resultStream = dataStream .keyBy("id") .reduce((curSate, newData) => SensorReding(curSate.id, newData.timestamp, curSate.temperature.min(newData.temperature)) ) .reduce((v1,v2)=> SensorReding(v1.id,v2.timestamp,v2.temperature.min(v2.temperature)) ) resultStream.print() env.execute() }
数据
sensor_1,1547718101,35.8 sensor_1,1547718102,22.2 sensor_1,1547718101,55.3 sensor_1,1547718102,24.1 sensor_1,1547718103,57 sensor_1,1547718103,58 sensor_1,1547718103,59 sensor_6,1547718101,15.4 sensor_7,1547718102,6.7 sensor_10,1547718205,38.1
自定义reduce
class MyReduceFunction extends ReduceFunction[SensorReding]{ override def reduce(t: SensorReding, t1: SensorReding): SensorReding = SensorReding(t.id,t1.timestamp,t.temperature.min(t1.timestamp)) }
以上是关于flnk算子reduce的简单使用的主要内容,如果未能解决你的问题,请参考以下文章
Spark 算子 reduce / reduceByKey / reduceByKeyLocally 区别