附加模式下的 Spark 结构化流,每个时间窗口输出多行

Posted

技术标签:

【中文标题】附加模式下的 Spark 结构化流,每个时间窗口输出多行【英文标题】:Spark structured streaming in append mode outputting many rows per single time window 【发布时间】:2021-09-07 20:25:43 【问题描述】:

我正在使用 Apache Spark 编写一个连续的应用程序。在结构化流式传输案例中,我尝试从 Delta 表中读取数据,通过时间窗口对事件时间执行流式聚合,然后以追加模式将结果写入 Delta 表。我对文档的期望是,在附加模式下,只有一个时间窗口的最终聚合将被写入接收器。这不是我的经验。相反,我在目标 Delta 表中看到如下记录,与我尝试使用流的许多配置无关(windowDuration=5 分钟,slideDuration=20 秒)。

Example output from stream

从上图中可以看出,同一个时间窗口正在向 sink 贡献许多记录。我确认每个微批次最多输出一个时间窗口的单个记录,但是一个时间窗口可以贡献来自许多(数量上并不明显一致)微批次的输出记录。这里是流聚合代码的核心。

output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"trades_stream_watermark_secs seconds")
                          .withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
                                                              windowDuration=f"analysis_window_length_secs seconds",
                                                              slideDuration=f"analysis_window_hop_size_secs seconds"))
                          .groupBy('time_window')
                          .applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))

Pandas UDF 创建一些保存标量值的变量,构造一个形状为 [1,N] 的 Pandas DataFrame,并将其作为结果输出。也就是说,它返回一行。我唯一分组的是时间窗口。我怎么能在同一个时间窗口获得多条记录?我以多种方式创建和关闭了流,每次都收到相同的结果(例如,根据Delta Lake docs、per the structured streaming guide,以及跨 read/load/table/toTable API 选项,尝试我能找到的每个选项配置...是的,数小时的蛮力)。我还尝试了水印持续时间和触发周期的各种值范围;没有任何影响。

这是追加模式下的预期行为(即同一时间窗口的多个记录)吗?

编辑:我使用的是 Databricks 运行时版本 8.3 ML。它具有 Spark 版本“3.1.1”。

编辑 2:我暂时考虑这个问题是否相关:https://issues.apache.org/jira/browse/SPARK-25756

【问题讨论】:

【参考方案1】:

为了避免这种情况加入大量未回答/跟进的问题,我将在下面记下我的初步结论,并在我了解更多信息时对其进行更新。这可能是错误的。请不要让这阻止其他答案/cmets。

总的来说,这不是预期的行为。每个微批次都被单独发送到 Pandas UDF(即,在每次触发当前微批次时,只有那个微批次被发送到 UDF)并导致结果表中的一条记录被发送到接收器,尽管处于附加模式.开发人员已注意到该问题,并创建了至少一个 JIRA 问题来解决该问题。此工作线程似乎处于非活动状态。

其他数据点和建议:

不同论坛(例如 Databricks)中的多个问题,以及上面链接的 JIRA 问题,请直接参考或提供 Spark 中此错误的明确示例。 该问题自 2018 年以来一直存在,似乎针对 3.1.2 版进行了修复,但 JIRA 问题已批量关闭,我认为没有继续讨论/工作。 目前,对于 Python 开发人员而言,Spark 结构化流式处理仅支持对流式聚合(即除了 apply 或 applyInPandas 之外,您可以在 GroupedData 对象上运行的函数)进行简单的数据转换。 如果您正在为重要的应用程序寻找流计算引擎,请不要指望 Python Spark API 提供支持,直到此问题得到解决。

非常有兴趣了解潜在的解决方法,或者如果我在上面得出了错误的结论。干杯。

【讨论】:

我已经得出结论,这不是预期的行为。我的初步结论是正确的。解决方法是,根据 Pandas UDF 中所需的每列,将列中的所有值收集到一个数组中,并将一条记录传递给带有正确时间戳的 UDF。

以上是关于附加模式下的 Spark 结构化流,每个时间窗口输出多行的主要内容,如果未能解决你的问题,请参考以下文章

Spark结构化流 - 使用模式从文件中读取时间戳

Spark 结构化流:Scala 中的模式推理

[Spark]-结构化流之初始篇

大数据(8s)Spark结构化流

有没有办法将生成的 groupby 流加入到 kafka-spark 结构化流中的原始流?

具有大窗口大小的火花结构化流:内存消耗