PySpark 结构化流将 udf 应用于窗口

Posted

技术标签:

【中文标题】PySpark 结构化流将 udf 应用于窗口【英文标题】:PySpark structured streaming apply udf to window 【发布时间】:2019-10-22 16:05:16 【问题描述】:

我正在尝试将 pandas udf 应用于 pyspark 结构化流的窗口。问题是,一旦流赶上了当前状态,所有新窗口就只包含一个值。

正如您在屏幕截图中看到的,2019-10-22T15:34:08.730+0000 之后的所有窗口仅包含一个值。用来生成这个的代码是这样的:

@pandas_udf("Count long, Resampled long, Start timestamp, End timestamp", PandasUDFType.GROUPED_MAP)
def myudf(df):
  df = df.dropna()
  df = df.set_index("Timestamp")
  df.sort_index(inplace=True)

  # resample the dataframe
  resampled = pd.DataFrame()
  oidx = df.index
  nidx = pd.date_range(oidx.min(), oidx.max(), freq="30S")
  resampled["Value"] = df.Value.reindex(oidx.union(nidx)).interpolate('index').reindex(nidx)
  return pd.DataFrame([[len(df.index), len(resampled.index), df.index.min(), df.index.max()]], columns=["Count", "Resampled", "Start", "End"])

predictionStream = sensorStream.withWatermark("Timestamp", "90 minutes").groupBy(col("Name"), window(col("Timestamp"), "70 minutes", "5 minutes"))

predictionStream.apply(myudf).writeStream \
    .queryName("aggregates") \
    .format("memory") \
    .start()

流确实每 5 分钟获取一次新值。只是窗口不知何故只从最后一批中获取值,即使水印不应该过期。

我做错了什么吗?我已经尝试过使用水印;这对结果没有影响。我需要 udf 中窗口的所有值。

我在设置为 5.5 LTS ML(包括 Apache Spark 2.4.3、Scala 2.11)的集群上的数据块中运行它

【问题讨论】:

【参考方案1】:

看起来你可以为你的 writeStream 指定你想要的输出模式

See documentation at Output Modes

默认使用追加模式:

这是默认模式,只有自上次触发后添加到结果表中的新行才会输出到接收器。

尝试使用:

predictionStream.apply(myudf).writeStream \
.queryName("aggregates") \
.format("memory") \
.outputMode(OutputMode.Complete) \
.start()

【讨论】:

也尝试过:org.apache.spark.sql.AnalysisException:当流数据帧/数据集上没有流聚合时,不支持完整输出模式。同样,不完整的窗口也没有问题,只要它们包含截至该时间点的所有数据。 我正在使用 readStream 从增量表中读取数据。 sensorStream = spark.readStream.format("delta").table(silver_path) 然后我会尝试使用窗口函数:databricks.com/blog/2017/05/08/… 我不完全知道您要对流数据执行什么操作,但也许您也应该更改逻辑,因为索引和东西看起来非常程序化 我正在使用窗口函数:O。 udf 中的当前代码仅用于调试目的。最终,udf 将使用 keras 模型对窗口数据进行预测。整个问题是窗口函数实际上并没有返回整个窗口,而只返回该窗口内最后一批的值。 正是我滚动的不够!我对 Delta 源上的 Spark Streming 不熟悉,文档也不清楚如何将 Delta 用作流式源,特别是对于结构化流。我认为声明源时缺少参数,您应该在 databricks 论坛上询问【参考方案2】:

我找到了一个关于这个问题的Spark JIRA issue,但它在没有解决的情况下被关闭了。该错误似乎是,我在 Spark 版本 3.1.1 上独立证实了这一点,即 Pandas UDF 在每个触发器上仅使用自上次触发器以来的数据执行。因此,您可能只处理要在每个触发器上考虑的数据的子集。 Grouped Map Pandas UDF 似乎不适用于具有增量表源的结构化流。如果您之前找到了解决方案,请继续跟进,否则我将把它留在这里给同样找到此线程的人。

编辑:Databricks 论坛中有一些讨论关于首先进行流式聚合,然后使用 Pandas UDF(可能需要包含数组的列的单个记录),如下所示。我尝试过这个。有用。但是,我的批处理持续时间很长,我不确定这些额外的工作对它有多大贡献。

agg_exprs = [f.collect_list('col_of_interest_1'),
             f.collect_list('col_of_interest_2'),
             f.collect_list('col_of_interest_3')]
intermediate_sdf = source_sdf.groupBy('time_window', ...).agg(agg_exprs)

final_sdf = intermediate_sdf.groupBy('time_window', ...).applyInPandas(func, schema)

【讨论】:

请在您的答案中添加 JIRA 参考。 关于我在上面的回复中发布的为解决 Spark 中的错误而进行的编辑,该方法在运行并产生正确结果的意义上是有效的。我不确定延迟。我使用这种方法的数据处理流目前需要 5 到 10 秒,我还没有以任何有意义的方式分析它们。

以上是关于PySpark 结构化流将 udf 应用于窗口的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构

PySpark 将算法转换为 UDF 并将其应用于 DataFrame

Pyspark:UDF 将正则表达式应用于数据帧中的每一行

Pyspark:在窗口内使用 udf

哪个选项使用 pyspark 提供最佳性能?使用地图进行 UDF 或 RDD 处理?

无法在 pyspark 中应用标量 pandas udf