PySpark 结构化流将 udf 应用于窗口
Posted
技术标签:
【中文标题】PySpark 结构化流将 udf 应用于窗口【英文标题】:PySpark structured streaming apply udf to window 【发布时间】:2019-10-22 16:05:16 【问题描述】:我正在尝试将 pandas udf 应用于 pyspark 结构化流的窗口。问题是,一旦流赶上了当前状态,所有新窗口就只包含一个值。
正如您在屏幕截图中看到的,2019-10-22T15:34:08.730+0000 之后的所有窗口仅包含一个值。用来生成这个的代码是这样的:
@pandas_udf("Count long, Resampled long, Start timestamp, End timestamp", PandasUDFType.GROUPED_MAP)
def myudf(df):
df = df.dropna()
df = df.set_index("Timestamp")
df.sort_index(inplace=True)
# resample the dataframe
resampled = pd.DataFrame()
oidx = df.index
nidx = pd.date_range(oidx.min(), oidx.max(), freq="30S")
resampled["Value"] = df.Value.reindex(oidx.union(nidx)).interpolate('index').reindex(nidx)
return pd.DataFrame([[len(df.index), len(resampled.index), df.index.min(), df.index.max()]], columns=["Count", "Resampled", "Start", "End"])
predictionStream = sensorStream.withWatermark("Timestamp", "90 minutes").groupBy(col("Name"), window(col("Timestamp"), "70 minutes", "5 minutes"))
predictionStream.apply(myudf).writeStream \
.queryName("aggregates") \
.format("memory") \
.start()
流确实每 5 分钟获取一次新值。只是窗口不知何故只从最后一批中获取值,即使水印不应该过期。
我做错了什么吗?我已经尝试过使用水印;这对结果没有影响。我需要 udf 中窗口的所有值。
我在设置为 5.5 LTS ML(包括 Apache Spark 2.4.3、Scala 2.11)的集群上的数据块中运行它
【问题讨论】:
【参考方案1】:看起来你可以为你的 writeStream 指定你想要的输出模式
See documentation at Output Modes
默认使用追加模式:
这是默认模式,只有自上次触发后添加到结果表中的新行才会输出到接收器。
尝试使用:
predictionStream.apply(myudf).writeStream \
.queryName("aggregates") \
.format("memory") \
.outputMode(OutputMode.Complete) \
.start()
【讨论】:
也尝试过:org.apache.spark.sql.AnalysisException:当流数据帧/数据集上没有流聚合时,不支持完整输出模式。同样,不完整的窗口也没有问题,只要它们包含截至该时间点的所有数据。 我正在使用 readStream 从增量表中读取数据。sensorStream = spark.readStream.format("delta").table(silver_path)
然后我会尝试使用窗口函数:databricks.com/blog/2017/05/08/… 我不完全知道您要对流数据执行什么操作,但也许您也应该更改逻辑,因为索引和东西看起来非常程序化
我正在使用窗口函数:O。 udf 中的当前代码仅用于调试目的。最终,udf 将使用 keras 模型对窗口数据进行预测。整个问题是窗口函数实际上并没有返回整个窗口,而只返回该窗口内最后一批的值。
正是我滚动的不够!我对 Delta 源上的 Spark Streming 不熟悉,文档也不清楚如何将 Delta 用作流式源,特别是对于结构化流。我认为声明源时缺少参数,您应该在 databricks 论坛上询问【参考方案2】:
我找到了一个关于这个问题的Spark JIRA issue,但它在没有解决的情况下被关闭了。该错误似乎是,我在 Spark 版本 3.1.1 上独立证实了这一点,即 Pandas UDF 在每个触发器上仅使用自上次触发器以来的数据执行。因此,您可能只处理要在每个触发器上考虑的数据的子集。 Grouped Map Pandas UDF 似乎不适用于具有增量表源的结构化流。如果您之前找到了解决方案,请继续跟进,否则我将把它留在这里给同样找到此线程的人。
编辑:Databricks 论坛中有一些讨论关于首先进行流式聚合,然后使用 Pandas UDF(可能需要包含数组的列的单个记录),如下所示。我尝试过这个。有用。但是,我的批处理持续时间很长,我不确定这些额外的工作对它有多大贡献。
agg_exprs = [f.collect_list('col_of_interest_1'),
f.collect_list('col_of_interest_2'),
f.collect_list('col_of_interest_3')]
intermediate_sdf = source_sdf.groupBy('time_window', ...).agg(agg_exprs)
final_sdf = intermediate_sdf.groupBy('time_window', ...).applyInPandas(func, schema)
【讨论】:
请在您的答案中添加 JIRA 参考。 关于我在上面的回复中发布的为解决 Spark 中的错误而进行的编辑,该方法在运行并产生正确结果的意义上是有效的。我不确定延迟。我使用这种方法的数据处理流目前需要 5 到 10 秒,我还没有以任何有意义的方式分析它们。以上是关于PySpark 结构化流将 udf 应用于窗口的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构
PySpark 将算法转换为 UDF 并将其应用于 DataFrame