带有 Pandas 矢量化 UDF 的 Spark 3

Posted

技术标签:

【中文标题】带有 Pandas 矢量化 UDF 的 Spark 3【英文标题】:Spark 3 with Pandas Vectorised UDF's 【发布时间】:2020-06-30 10:58:15 【问题描述】:

我正在研究在 PySpark (v3) 中使用 Pandas UDF。出于多种原因,我理解迭代和 UDF 通常是不好的,并且我理解我在这里展示的简单示例可以使用 SQL 函数完成 PySpark - 所有这些都不是重点!

我一直在关注本指南:https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html

我有一个来自文档的简单示例:

import pandas as pd
from typing import Iterator, Tuple
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf

spark = SparkSession.builder.getOrCreate()

pdf = pd.DataFrame(([1, 2, 3], [4, 5, 6], [8, 9, 0]), columns=["x", "y", "z"])
df = spark.createDataFrame(pdf)

@pandas_udf('long')
def test1(x: pd.Series, y: pd.Series) -> pd.Series:
    return x + y

df.select(test1(col("x"), col("y"))).show()

这对于执行基本算术非常有效——如果我想加法、乘法等,这很简单(但在没有函数的 PySpark 中也很简单)。

我想对这些值进行比较,例如:

@pandas_udf('long')
def test2(x: pd.Series, y: pd.Series) -> pd.Series:
    return x if x > y else y

df.select(test2(col("x"), col("y"))).show()

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all(). 会出错。我知道它正在评估系列而不是行值。

所以有一个迭代器的例子。这同样适用于他们提供的基本算术示例。但如果我尝试应用逻辑:

@pandas_udf("long")
def test3(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for x, y in batch_iter:
        yield x if x > y else y

df.select(test3(col("x"), col("y"))).show()

我得到了和以前一样的 ValueError。

所以我的问题是我应该如何执行这样的逐行比较?在矢量化函数中是否可能?如果不是,那么它们的用例是什么?

【问题讨论】:

【参考方案1】:

我想通了。在你写下来并将问题发布给全世界之后就很简单了。

只需返回一个数组,然后转换为 Pandas 系列:

@pandas_udf('long')
def test4(x: pd.Series, y: pd.Series) -> pd.Series:
    return pd.Series([a if a > b else b for a, b in zip(x, y)])

df.select(test4(col("x"),col("y"))).show()

【讨论】:

【参考方案2】:

过去两天我一直在寻找这个答案,谢谢 simon_dmorias!

我需要在这里稍微修改一下示例。我将单个 pandas_udf 分解为多个组件以便于管理。以下是我供其他人参考的示例:

xdf = pd.DataFrame(([1, 2, 3,'Fixed'], [4, 5, 6,'Variable'], [8, 9, 0,'Adjustable']), columns=["x", "y", "z", "Description"])
df = spark.createDataFrame(xdf)

def fnRate(x):
    return pd.Series(['Fixed' if 'Fixed' in str(v) else 'Variable' if 'Variable' in str(v) else 'Other' for v in zip(x)])


@pandas_udf('string')
def fnRateRecommended(Description: pd.Series) -> pd.Series:
    varProduct = fnRate(Description)

    return varProduct

# call function
df.withColumn("Recommendation", fnRateRecommended(sf.col("Description"))).show()

【讨论】:

以上是关于带有 Pandas 矢量化 UDF 的 Spark 3的主要内容,如果未能解决你的问题,请参考以下文章

将 pandas_udf 与 spark 2.2 一起使用

使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas

如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?

使用 spark.sparkContext.addPyFile 导入 Pandas UDF

我可以将 spark 数据帧作为参数发送给 pandas UDF

Spark pandas_udf 并不快