Flink 基本算子mapkeyBysum

Posted noyouth

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 基本算子mapkeyBysum相关的知识,希望对你有一定的参考价值。

核心代码:

object TransformTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt")

    //------------map--------------
    val dataStream1 = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })

    //------------keyBY--------------
    //此时key是一个元组
    val dataStream2: KeyedStream[SensorReading, Tuple] = dataStream1.keyBy(0)
    val dataStream3: KeyedStream[SensorReading, Tuple] = dataStream1.keyBy("id")
    //此时key是字段的类型
    val dataStream4: KeyedStream[SensorReading, String] = dataStream1.keyBy(_.id)

    //------------sum--------------
    val dataStream5 = dataStream4.sum(2)
    val dataStream6 = dataStream4.sum("temperature")

    dataStream6.print

    env.execute("transform test")
  }

}

case class SensorReading(id: String, timestamp: Long, temperature: Double)

sensor.txt文件内容:

sensor_1, 1547718199, 35.80018327300259
sensor_6, 1547718201, 15.402984393403084
sensor_7, 1547718202, 6.720945201171228
sensor_10, 1547718205, 38.101067604893444
sensor_1, 1547718200, 30.8
sensor_1, 1547718201, 40.8

输出结果:

SensorReading(sensor_1,1547718199,35.80018327300259)
SensorReading(sensor_6,1547718201,15.402984393403084)
SensorReading(sensor_7,1547718202,6.720945201171228)
SensorReading(sensor_10,1547718205,38.101067604893444)
SensorReading(sensor_1,1547718199,66.60018327300259)
SensorReading(sensor_1,1547718199,107.40018327300258)

  

以上是关于Flink 基本算子mapkeyBysum的主要内容,如果未能解决你的问题,请参考以下文章

Flink基础篇,基本概念设计理念架构模型编程模型常用算子

《Flink应用实战》--合并流-Union算子

Flink的ConGroup算子介绍

Flink的Union算子和Connect算子,流合并

Apache Flink 漫谈系列 - JOIN 算子

flink数据流转换算子