结构化流是如何执行 pandas_udf 的?

Posted

技术标签:

【中文标题】结构化流是如何执行 pandas_udf 的?【英文标题】:How does Structured Streaming execute pandas_udf? 【发布时间】:2019-12-06 15:17:44 【问题描述】:

我想了解结构化流式处理如何处理即将到来的新数据。

如果更多行同时到达,spark 将它们附加到输入流数据帧,对吗?

如果我有一个 withColumn 并应用一个 pandas_udf,该函数每行调用一次,还是只调用一次,然后将行传递给 pandas_udf?

让我们这样说:

dfInt = spark \
    .readStream \
    .load() \
    .withColumn("prediction", predict( (F.struct([col(x) for x in (features)]))))

如果更多行同时到达,它们是一起处理还是每个处理一次?= 有机会将其限制为每次仅一行吗?

【问题讨论】:

【参考方案1】:

如果更多行同时到达,spark 将它们附加到输入流数据帧,对吗?

我们只谈谈微批处理执行引擎,对吧?这是您最有可能在流式查询中使用的内容。

结构化流式处理使用Source.getBatch(DataSource API V1)在流式查询中查询流式传输源:

getBatch(start: Option[Offset], end: Offset): DataFrame

返回偏移量之间的数据(startend]。当startNone 时,批处理应从第一条记录开始。

DataFrame 中源返回的任何内容都是要在微批处理中处理的数据。

如果我有一个 withColumn 并应用一个 pandas_udf,则该函数每行调用一次

总是。这就是用户定义函数在 Spark SQL 中的工作方式。

还是只传递一次,然后将行传递给 pandas_udf?

This 说:

Pandas UDF 是用户定义的函数,由 Spark 执行,使用 Arrow 传输数据,使用 Pandas 处理数据。

Python 函数应将pandas.Series 作为输入并返回相同长度的pandas.Series。在内部,Spark 将执行 Pandas UDF,方法是将列拆分为批次,并将每个批次的函数作为数据的子集调用,然后将结果连接在一起。

如果更多行同时到达,它们是一起处理还是每个处理一次?

如果“到达”表示“单个 DataFrame 的一部分”,则“它们一起处理”,但一次一行(根据 UDF 合同)。

是否有机会将其限制为每次仅一行?

您不必这样做。它是这样设计的。一次只能一行。

【讨论】:

非常感谢您的详细回答。我解决了我的问题,在 pandas udf 中对 pandas 系列中的所有行进行分类。你能告诉我我们是否可以在 scala 中使用 pandas_udf 吗?我们在使用 pandas_udf 时遇到了一些性能和内存错误。谢谢 @xcsob pandas_udf 在 scala 中?不可能。而你实际上并不需要它。纯 UDF 应该可以正常工作。如果它对你有用,你能接受答案吗?谢谢。 我需要 pandas_udf 因为我使用 scikit 模型对流数据进行分类。我如何使用纯 udf 来实现这一点? 不认为这是可能的,但我建议提出一个单独的问题。感谢您接受答案!

以上是关于结构化流是如何执行 pandas_udf 的?的主要内容,如果未能解决你的问题,请参考以下文章

将多行结构化流式传输到 pandas udf

如何在结构化查询中使用 scikit-learn 模型?

PySpark 结构化流将 udf 应用于窗口

脱离文档流是啥意思

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?