将多行结构化流式传输到 pandas udf

Posted

技术标签:

【中文标题】将多行结构化流式传输到 pandas udf【英文标题】:Structured streaming multiple row to pandas udf 【发布时间】:2019-12-06 12:35:51 【问题描述】:

我正在编写一个从 eventthubs 接收数据的结构化流作业。 经过一些准备后,我在每一行上应用 pandas_udf 函数来创建一个新列,其中包含来自泡菜模型的预测。

我遇到了一个严重的问题:有时 pandas_udf 的输入是一组行而不是单行(如预期的那样)。这导致我出错:

RuntimeError: Result vector from pandas_udf was not the required length: expected 2, got 1

发生这种情况是因为 pandas_udf 接收到多行(在本例中为 2)。

这怎么可能? .withColumn 不应该在每一行上逐行执行吗?

这是我的代码:

dfInt = spark \
    .readStream \
    .load() \
    .selectExpr("cast (body as string) as json") \
    .select(from_json("json",schema).alias("data")) \
    .withColumn("k", expr("uuid()")) \
    .select("key", explode("data.features").alias("feat")) \
    .select("feat.*", "key") \
    .groupBy("k") \
    .agg(*expressions) \
    .drop("k") \
    .na.drop() \
    .withColumn("prediction", predict( (F.struct([col(x) for x in (features)]))))

pandas_udf 如下:

@pandas_udf(FloatType())
def predict(x):
  return pd.Series(pickle_model.predict_proba(x)[0][1])

实际上问题似乎出现在使用 udf 的 withColumn 调用之前,因为更多行来自上一步。 groupBy 聚合返回一个单数行,因为我创建 group by 的键是唯一的。

你知道这是什么原因吗?

【问题讨论】:

【参考方案1】:

在这种情况下,您使用的是 SCALAR pandas_udf,它将 pandas 系列作为输入并返回相同大小的 pandas.Series。我不知道内部的确切细节,但我的理解是每个执行程序会将您的列 (F.struct([col(x) for x in (features)])) 转换为执行程序当前正在处理的 Dataframe 分区的pandas.Series 并将该函数应用于系列。一个分区由许多行组成,因此您不能假设该系列的长度仅为 1。您需要确保保留所有行的所有预测概率。您可能可以这样做(假设您确实只对保持第 1 类的概率感兴趣):

@pandas_udf(FloatType())
def predict(x):
    return pd.Series(pickle_model.predict_proba(x)[:,1])

【讨论】:

以上是关于将多行结构化流式传输到 pandas udf的主要内容,如果未能解决你的问题,请参考以下文章

附加模式下的 Spark 结构化流,每个时间窗口输出多行

pandas_udf结果无法写入表

结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行

将 JSON 流式传输到 Bigquery

PySpark 结构化流将 udf 应用于窗口

嵌套 json 中的结构化流式传输不同模式