Spark Structured Streaming 窗口化和分组操作

Posted

技术标签:

【中文标题】Spark Structured Streaming 窗口化和分组操作【英文标题】:Spark Structured Streaming windowing and grouping operations 【发布时间】:2017-12-21 22:43:38 【问题描述】:

我已经成功地对我的流数据框进行了分组操作,以计算每次旅行的平均乘客人数。

val carSchema =
    new StructType()
    .add("trackId", StringType)
    .add("carId", StringType)
    .add("peopleCount", StringType)
    .add("time", StringType)

在这个问题中有几个赛车场(在我的例子中是 3 个)。他们每个人都有自己独特的“trackId”。在这些轨道中,可能有多辆车在行驶,每辆车都有一个单独的“carId”。我们还使用“peopleCount”跟踪车内有多少人。字段“时间”对应于给定赛车的比赛开始时间。

因为我们要计算汽车中的平均人数,所以我们将“peopleCount”从字符串转换为整数:

val dataFrame = 
    inputStream.selectExpr("CAST (content AS STRING) AS JSON")
    .select(from_json($"json", schema = carSchema)
    .as("carData"))
    .select("carData.*")
    .withColumn("peopleCount", toInt($"peopleCount"))

dataFrame.printSchema
root
 |-- trackId: string (nullable = true)
 |-- carId: string (nullable = true)
 |-- peopleCount: integer (nullable = true)
 |-- time: string (nullable = true)

供参考,数据如下所示:

|trackId                             |carId                               |peopleCount |time                        |
+------------------------------------+------------------------------------+------------------------------------------
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|2           |2017-12-20T23:04:14.7900000Z|
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|1           |2017-12-20T23:23:34.5510000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|2           |2017-12-20T19:27:57.7710000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|3           |2017-12-19T19:29:32.9790000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|4           |2017-12-19T19:31:12.6600000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|1           |2017-12-19T19:32:52.7190000Z|
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|2           |2017-12-19T23:45:06.4140000Z|
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|3           |2017-12-20T21:09:03.7440000Z|
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|2f16b0f9-164c-4e3d-a5c9-f672bcf87197|3           |2017-12-19T21:25:06.2340000Z|
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|2f16b0f9-164c-4e3d-a5c9-f672bcf87197|3           |2017-12-20T18:10:03.6540000Z|
<...more data...>

现在,因为我们想找出每条赛道的平均人数:

val avgPeopleInCars = dataFrame.groupBy("trackId").avg("peopleCount")

这会返回正确的平均值。有 3 条赛道,我收到了 3 条线路返回,其中包含这 3 条赛道中每条赛道的平均车内人数:

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------+-------------------+
|trackId                             |avg(peopleCount)  |
+------------------------------------+-------------------+
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|3.5               |
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.0               |
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|1.0               |
+------------------------------------+-------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+-------------------+
|trackId                             |avg(peopleCount)  |
+------------------------------------+-------------------+
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.5               |
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.2               |
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|3.0               |
+------------------------------------+-------------------+

目前,我正在尝试了解如何调整输出以使用窗口大小为 3 分钟且滑动间隔为 1 分钟的窗口。并且仍然做同样的计算:每次旅行的平均人数。我最初的尝试是:

val windowedData = 
    dataFrame
    .groupBy(window($"time", "3 minutes", "1 minute"), $"trackId")
    .avg("peopleCount")

windowedData.printSchema
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- trackId: string (nullable = true)
 |-- avg(peopleCount): double (nullable = true)

但是,这看起来不对。我希望收到与上一步相同类型的输出 - 每个窗口输出数据集应包括 3 行,每个赛道一个。

-------------------------------------------
Batch: 0
-------------------------------------------
+---------------------------------------------+------------------------------------+-------------------+
|window                                       |trackId                             |avg(peopleCount)  |
+---------------------------------------------+------------------------------------+-------------------+
|[2017-12-18 23:02:00.0,2017-12-18 23:05:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0               |
|[2017-12-18 23:03:00.0,2017-12-18 23:06:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0               |
|[2017-12-18 23:04:00.0,2017-12-18 23:07:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0               |
+---------------------------------------------+------------------------------------+-------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------------------------------------------+------------------------------------+-------------------+
|window                                       |trackId                             |avg(peopleCount)  |
+---------------------------------------------+------------------------------------+-------------------+
|[2017-12-18 23:02:00.0,2017-12-18 23:05:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0               |
|[2017-12-18 23:03:00.0,2017-12-18 23:06:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0               |
|[2017-12-18 23:04:00.0,2017-12-18 23:07:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0               |
|[2017-12-21 18:55:00.0,2017-12-21 18:58:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|2.0               |
|[2017-12-21 18:56:00.0,2017-12-21 18:59:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|2.0               |
|[2017-12-21 18:57:00.0,2017-12-21 19:00:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|2.0               |
|[2017-12-21 18:59:00.0,2017-12-21 19:02:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|3.0               |
|[2017-12-21 19:00:00.0,2017-12-21 19:03:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.0               |
|[2017-12-21 19:01:00.0,2017-12-21 19:04:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.0               |
|[2017-12-21 19:02:00.0,2017-12-21 19:05:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.5               |
+---------------------------------------------+------------------------------------+-------------------+

【问题讨论】:

@JacekLaskowski 非常感谢您查看它。我已经更新了这个问题。看起来窗口操作工作正常,但是查询分组数据的方式有问题。我想要的是每个时间窗口 3 个条目,对应于 3 个赛道中的每一个,平均车内人数。我已经通过“trackId”进行“分组”,但是它仍然返回多个条目..有什么想法吗? 【参考方案1】:

如果您仔细观察,您会发现它没有返回多个条目,而是为每个“窗口”提供一个条目。

我不是窗口专家,但我想如果您放置适当的水印,您将只有最新的窗口。

【讨论】:

以上是关于Spark Structured Streaming 窗口化和分组操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming

Spark Structured Streaming

Spark Structured Streaming - 1

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录