Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows窗口是否可以实现最近一小时统计

Posted yy3b2007com

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows窗口是否可以实现最近一小时统计相关的知识,希望对你有一定的参考价值。

WaterMark除了可以限定来迟数据范围,是否可以实现最近一小时统计?

WaterMark目的用来限定参数计算数据的范围:比如当前计算数据内max timestamp是12::00,waterMark限定数据分为是60 minutes,那么如果此时输入11:00之前的数据就会被舍弃不参与统计,视为来迟范围超出了60minutes限定范围。

那么,是否可以借助它实现最近一小时的数据统计呢?

代码示例:

package com.dx.streaming

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.log4j.{Level, Logger}

case class MyEntity(id: String, timestamp: Timestamp, value: Integer)

object Main {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
  Logger.getLogger("akka").setLevel(Level.ERROR);
  Logger.getLogger("kafka").setLevel(Level.ERROR);

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    val lines = spark.readStream.format("socket").option("host", "192.168.0.141").option("port", 19999).load()

    var sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    import spark.implicits._
    lines.as(Encoders.STRING)
      .map(row => {
        val fields = row.split(",")
        MyEntity(fields(0), new Timestamp(sdf.parse(fields(1)).getTime), Integer.valueOf(fields(2)))
      })
      .createOrReplaceTempView("tv_entity")

    spark.sql("select id,timestamp,value from tv_entity")
      .withWatermark("timestamp", "60 minutes")
      .createOrReplaceTempView("tv_entity_watermark")

    val resultDf = spark.sql(
      s"""
         |select id,sum(value) as sum_value
         |from  tv_entity_watermark
         |group id
         |""".stripMargin)

    val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start()

    query.awaitTermination()
    query.stop()
  }
}

当通过nc -lk 19999中依次(每组输入间隔几秒时间即可)输入如下数据时:

1,2018-12-01 12:00:01,100
2,2018-12-01 12:00:01,100

1,2018-12-01 12:05:01,100
2,2018-12-01 12:05:01,100

1,2018-12-01 12:15:01,100
2,2018-12-01 12:15:01,100

1,2018-12-01 12:25:01,100
2,2018-12-01 12:25:01,100

1,2018-12-01 12:35:01,100
2,2018-12-01 12:35:01,100

1,2018-12-01 12:45:01,100
2,2018-12-01 12:45:01,100

1,2018-12-01 12:55:01,100
2,2018-12-01 12:55:01,100

1,2018-12-01 13:05:02,100
2,2018-12-01 13:05:02,100

1,2018-12-01 13:15:01,100
2,2018-12-01 13:15:01,100

发现最终统计结果为:

id  , sum_value
1   ,  900
2   ,  900

而不是期望的

id  , sum_value
1   ,  600
2   ,  600

既然是不能限定数据统计范围是60minutes,是否需要借助于窗口函数window就可以实现呢?

是否需要借助于watermark和窗口函数window就可以实现最近1小时数据统计呢?

    spark.sql("select id,timestamp,value from tv_entity")
      .withWatermark("timestamp", "60 minutes")
      .createOrReplaceTempView("tv_entity_watermark")

    val resultDf = spark.sql(
      s"""
         |select id,sum(value) as sum_value
         |from  tv_entity_watermark
         |group window(timestamp,‘60 minutes‘,‘60 minutes‘),id
         |""".stripMargin)

    val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start()

依然输入上边的测试数据,会发现超过1小时候数据会重新开辟(归零后重新统计)一个统计结果,而不是滚动的一小时统计。

就是把上边的测试数据分为了两组来分别统计:

第一组(小时)参与统计数据:

1,2018-12-01 12:00:01,100
2,2018-12-01 12:00:01,100

1,2018-12-01 12:05:01,100
2,2018-12-01 12:05:01,100

1,2018-12-01 12:15:01,100
2,2018-12-01 12:15:01,100

1,2018-12-01 12:25:01,100
2,2018-12-01 12:25:01,100

1,2018-12-01 12:35:01,100
2,2018-12-01 12:35:01,100

1,2018-12-01 12:45:01,100
2,2018-12-01 12:45:01,100

1,2018-12-01 12:55:01,100
2,2018-12-01 12:55:01,100

第二组(小时)参与统计数据:

1,2018-12-01 13:05:02,100
2,2018-12-01 13:05:02,100

1,2018-12-01 13:15:01,100
2,2018-12-01 13:15:01,100

猜测总结:

根据上边测试结果可以推出一个猜测结论:

在spark structured streaming中是不存储参数统计的数据的,只是对数据进行了maxTimestamp.avgTimestamp,minTimestamp存储,同时只是对数据的统计结果进行存储,下次再次触发统计时只是在原有的统计结果之上进行累加等操作,而参与统计的数据应该是没有存储,否则这类需求应该是可以实现。

以上是关于Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows窗口是否可以实现最近一小时统计的主要内容,如果未能解决你的问题,请参考以下文章

第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法

cdh5.15集群添加spark2.3服务(parcels安装)

如何让 Spark2.3 在 Jupyter Notebook 中工作

pyspark读取csv文件multiLine选项不适用于具有换行符spark2.3和spark2.2的记录

Spark学习(四)Spark2.3 HA集群的分布式安装

Spark学习之路 Spark2.3 HA集群的分布式安装