Spark Structured Streaming 窗口化和分组操作
Posted
技术标签:
【中文标题】Spark Structured Streaming 窗口化和分组操作【英文标题】:Spark Structured Streaming windowing and grouping operations 【发布时间】:2017-12-21 22:43:38 【问题描述】:我已经成功地对我的流数据框进行了分组操作,以计算每次旅行的平均乘客人数。
val carSchema =
new StructType()
.add("trackId", StringType)
.add("carId", StringType)
.add("peopleCount", StringType)
.add("time", StringType)
在这个问题中有几个赛车场(在我的例子中是 3 个)。他们每个人都有自己独特的“trackId”。在这些轨道中,可能有多辆车在行驶,每辆车都有一个单独的“carId”。我们还使用“peopleCount”跟踪车内有多少人。字段“时间”对应于给定赛车的比赛开始时间。
因为我们要计算汽车中的平均人数,所以我们将“peopleCount”从字符串转换为整数:
val dataFrame =
inputStream.selectExpr("CAST (content AS STRING) AS JSON")
.select(from_json($"json", schema = carSchema)
.as("carData"))
.select("carData.*")
.withColumn("peopleCount", toInt($"peopleCount"))
dataFrame.printSchema
root
|-- trackId: string (nullable = true)
|-- carId: string (nullable = true)
|-- peopleCount: integer (nullable = true)
|-- time: string (nullable = true)
供参考,数据如下所示:
|trackId |carId |peopleCount |time |
+------------------------------------+------------------------------------+------------------------------------------
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|2 |2017-12-20T23:04:14.7900000Z|
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|1 |2017-12-20T23:23:34.5510000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|2 |2017-12-20T19:27:57.7710000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|3 |2017-12-19T19:29:32.9790000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|4 |2017-12-19T19:31:12.6600000Z|
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|984ec5d7-f4a6-422b-aeb6-d130efaf0001|1 |2017-12-19T19:32:52.7190000Z|
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|2 |2017-12-19T23:45:06.4140000Z|
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|a85f22a3-5f57-4bde-ad00-5eeb303a9859|3 |2017-12-20T21:09:03.7440000Z|
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|2f16b0f9-164c-4e3d-a5c9-f672bcf87197|3 |2017-12-19T21:25:06.2340000Z|
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|2f16b0f9-164c-4e3d-a5c9-f672bcf87197|3 |2017-12-20T18:10:03.6540000Z|
<...more data...>
现在,因为我们想找出每条赛道的平均人数:
val avgPeopleInCars = dataFrame.groupBy("trackId").avg("peopleCount")
这会返回正确的平均值。有 3 条赛道,我收到了 3 条线路返回,其中包含这 3 条赛道中每条赛道的平均车内人数:
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------+-------------------+
|trackId |avg(peopleCount) |
+------------------------------------+-------------------+
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|3.5 |
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.0 |
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|1.0 |
+------------------------------------+-------------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+-------------------+
|trackId |avg(peopleCount) |
+------------------------------------+-------------------+
|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.5 |
|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.2 |
|52f4c09c-7b9d-45d9-96ac-e0fe49458962|3.0 |
+------------------------------------+-------------------+
目前,我正在尝试了解如何调整输出以使用窗口大小为 3 分钟且滑动间隔为 1 分钟的窗口。并且仍然做同样的计算:每次旅行的平均人数。我最初的尝试是:
val windowedData =
dataFrame
.groupBy(window($"time", "3 minutes", "1 minute"), $"trackId")
.avg("peopleCount")
windowedData.printSchema
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- trackId: string (nullable = true)
|-- avg(peopleCount): double (nullable = true)
但是,这看起来不对。我希望收到与上一步相同类型的输出 - 每个窗口输出数据集应包括 3 行,每个赛道一个。
-------------------------------------------
Batch: 0
-------------------------------------------
+---------------------------------------------+------------------------------------+-------------------+
|window |trackId |avg(peopleCount) |
+---------------------------------------------+------------------------------------+-------------------+
|[2017-12-18 23:02:00.0,2017-12-18 23:05:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0 |
|[2017-12-18 23:03:00.0,2017-12-18 23:06:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0 |
|[2017-12-18 23:04:00.0,2017-12-18 23:07:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0 |
+---------------------------------------------+------------------------------------+-------------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+---------------------------------------------+------------------------------------+-------------------+
|window |trackId |avg(peopleCount) |
+---------------------------------------------+------------------------------------+-------------------+
|[2017-12-18 23:02:00.0,2017-12-18 23:05:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0 |
|[2017-12-18 23:03:00.0,2017-12-18 23:06:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0 |
|[2017-12-18 23:04:00.0,2017-12-18 23:07:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|1.0 |
|[2017-12-21 18:55:00.0,2017-12-21 18:58:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|2.0 |
|[2017-12-21 18:56:00.0,2017-12-21 18:59:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|2.0 |
|[2017-12-21 18:57:00.0,2017-12-21 19:00:00.0]|4ccfeb47-c76f-43f4-87bd-7a5777f78e7a|2.0 |
|[2017-12-21 18:59:00.0,2017-12-21 19:02:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|3.0 |
|[2017-12-21 19:00:00.0,2017-12-21 19:03:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.0 |
|[2017-12-21 19:01:00.0,2017-12-21 19:04:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.0 |
|[2017-12-21 19:02:00.0,2017-12-21 19:05:00.0]|f261a42d-a7ac-4a2d-81b4-c5c7189a2b66|2.5 |
+---------------------------------------------+------------------------------------+-------------------+
【问题讨论】:
@JacekLaskowski 非常感谢您查看它。我已经更新了这个问题。看起来窗口操作工作正常,但是查询分组数据的方式有问题。我想要的是每个时间窗口 3 个条目,对应于 3 个赛道中的每一个,平均车内人数。我已经通过“trackId”进行“分组”,但是它仍然返回多个条目..有什么想法吗? 【参考方案1】:如果您仔细观察,您会发现它没有返回多个条目,而是为每个“窗口”提供一个条目。
我不是窗口专家,但我想如果您放置适当的水印,您将只有最新的窗口。
【讨论】:
以上是关于Spark Structured Streaming 窗口化和分组操作的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?