flnk算子reduce的简单使用

Posted nohert

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flnk算子reduce的简单使用相关的知识,希望对你有一定的参考价值。

import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

object TransformTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //0.从文件中读取数据
    val inputPath = "D:\\\\ideaDemo\\\\maven_flink\\\\src\\\\main\\\\resources\\\\sensor.txt";
    val inputStream = env.readTextFile(inputPath)

    //1.先转换成样例类类型(简单转换操作)
    val dataStream = inputStream.map(data => {
      val arr = data.split(",")
      SensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //3.需要输出当前最小温度值,以及最近时间戳,要用reduce
    val resultStream = dataStream
      .keyBy("id")
     .reduce((curSate, newData) =>
       SensorReding(curSate.id, newData.timestamp, curSate.temperature.min(newData.temperature))
      )
        .reduce((v1,v2)=>
          SensorReding(v1.id,v2.timestamp,v2.temperature.min(v2.temperature))
        )
    resultStream.print()


    env.execute()
  }

 数据

sensor_1,1547718101,35.8
sensor_1,1547718102,22.2
sensor_1,1547718101,55.3
sensor_1,1547718102,24.1
sensor_1,1547718103,57
sensor_1,1547718103,58
sensor_1,1547718103,59
sensor_6,1547718101,15.4
sensor_7,1547718102,6.7
sensor_10,1547718205,38.1

  

自定义reduce

  class MyReduceFunction extends ReduceFunction[SensorReding]{
    override def reduce(t: SensorReding, t1: SensorReding): SensorReding =
      SensorReding(t.id,t1.timestamp,t.temperature.min(t1.timestamp))
  }

 

以上是关于flnk算子reduce的简单使用的主要内容,如果未能解决你的问题,请参考以下文章

Spark 算子 reduce / reduceByKey / reduceByKeyLocally 区别

几个关于js数组方法reduce的经典片段

spark 算子分析

RDD之action算子

08-flink-1.10.1- flink Transform api 转换算子

08-flink-1.10.1- flink Transform api 转换算子