11-flink-1.10.1- Flink window API

Posted 逃跑的沙丁鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了11-flink-1.10.1- Flink window API相关的知识,希望对你有一定的参考价值。

目录

1 window api 概念

2 window api 类型

 2.1 时间窗口(Time Window)

2.2 计数窗口

3 窗口函数(window function)

 4 window api 结合代码


1 window api 概念

  •  一般真实的流都是无界的,怎样处理无界的流?
  • 可以把无界的流进行切分,得到有限的数据集进行处理-也就是得到了有界的流进行处理。
  • 窗口就是将无限流切割成有限流的一种方式,它会将有限的流分发到有限大小的桶里(bucket)进行处理

2 window api 类型

 2.1 时间窗口(Time Window)

    (1)滚动的时间窗口

  • 将数据依据固定的时间窗口长度对数据进行切分
  • 时间固定,窗口长度相同,没有重叠 
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))//滚动窗口
.timeWindow(Time.seconds(15))//滚动窗口简写

    (2)滑动的时间窗口

  • 滑动窗口是固定窗口更为广泛义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
  • 窗口长度有固定可以有重叠
  • 使用场景,比如最近24小时内,最近1一个小时内的统计指标等等
.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(10)))//滑动窗口
.timeWindow(Time.seconds(15),Time.seconds(10))//滑动窗口简写

  (3) 会话窗口

  • 有一些列时间组合一个指定的时间长度的timeout间隙组成,也就是一段时间没有接收到数据就会生成新的窗口
  • 特点,时间无对其 
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))//会话窗口

2.2 计数窗口

   滚动计数窗口

.countWindow(10)//计数滚动窗口

  滑动计数窗口

.countWindow(10,5)//计数滑动窗口

3 窗口函数(window function)

window function 定义了对窗口中收集的数据做的计算操作

可分为两类:

① 增量聚合函数(incremental aggregation function)

    每条数据到来就进行计算,保持一个简单的状态

    ReduceFunction,Aggregation

② 全窗口函数(full window functions)

   先把窗口所有数据收集起来,等到计算的时候会遍历所有数据

  ProcessWindowFunction,WindowFunction

 4 window api 结合代码

 需求,求最近15秒内收到的数据里温度最小的温度,且最后一次温度采集的时间戳是什么?

 ① 读有界数据无法输出

package com.study.liucf.unbounded.window

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * @Author liucf
 * @Date 2021/9/22
 *      需求,求最近15秒内收到的数据里温度最小的温度,且最后一次温度采集的时间戳是什么?
 */
object LiucfWindowApi {
  def main(args: Array[String]): Unit = {
    //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取数据
    val inputStream: DataStream[String] = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
    //转换数据类型 string 类型转换成LiucfSensorReding,求最小值
    val ds = inputStream.map(r => {
      val arr = r.split(",")
      LiucfSensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
    }).map(d=>(d.id,d.temperature,d.timestamp))
      .keyBy(_._1) // 按照传感器id分组
//      .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))//滚动窗口
//      .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(10)))//滑动窗口
//      .window(EventTimeSessionWindows.withGap(Time.seconds(20)))//会话窗口
//
//      .timeWindow(Time.seconds(15),Time.seconds(10))//滑动窗口简写
//      .countWindow(10)//计数滚动窗口
//      .countWindow(10,5)//计数滑动窗口
      .timeWindow(Time.seconds(15))//滚动窗口简写
//        .reduce((currentData,newData)=>{
//          (currentData._1,currentData._2.min(newData._2),currentData._3.max(currentData._3))
//        })
        .reduce(new LiucfReduceFunction())
    //输出到控制台
    ds.print()
    env.execute("flink window api:liucf window api test ")
  }
}

/**
 * 自定义reduceFunction处理
 */
class LiucfReduceFunction extends ReduceFunction[(String,Double,Long)]{
  override def reduce(value1: (String, Double, Long), value2: (String, Double, Long)): (String, Double, Long) = {

    (value1._1,value1._2.min(value2._2),value1._3.max(value2._3))
  }
}

可见没有任何输出,这是因为窗口关闭需要等到15秒,而读取文件数据的有界流很快就完成了还没等到15秒就结束,所以没有输出

解决策略是换成无界流进行数据读入就可以了,我下面代码使用kafka来进行测试

为了看到效果我把窗口改成60秒快速往kafka里生成几条数据

package com.study.liucf.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer

object SensorProduce2 {
  def main(args: Array[String]): Unit = {
    val kafkaProp = new Properties()
    kafkaProp.put("bootstrap.servers", "192.168.109.151:9092")
    kafkaProp.put("acks", "1")
    kafkaProp.put("retries", "3")
    //kafkaProp.put("batch.size", 16384)//16k
    kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("topic","sensor_input_csv")
    val producer = new KafkaProducer[String, String](kafkaProp)
    val sensor = "sensor_1,1617505481,30.6"
    send(sensor,producer)
    producer.close()
  }


  def send(str:String,producer: KafkaProducer[String, String]): Unit ={
    val record = new ProducerRe cord[String, String]("sensor_input_csv", str )
    producer.send(record, new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        if (metadata != null) {
          println("发送成功")
        }
        if (exception != null) {
          println("消息发送失败")
        }
      }
    })
  }
}

 

可见可以达到预期的。

以上是关于11-flink-1.10.1- Flink window API的主要内容,如果未能解决你的问题,请参考以下文章

Flink SQL 窗口表值函数 Window TVF 实战

Flink学习笔记:搭建Flink on Yarn环境并运行Flink应用

Flink学习笔记01:初探Flink

var wi = 0; wi < arr.length; wi++

flink 读取mysql并使用flink sql

Flink关于Flink:Flink-SortShuffle-实现简介