带有 Pandas 矢量化 UDF 的 Spark 3
Posted
技术标签:
【中文标题】带有 Pandas 矢量化 UDF 的 Spark 3【英文标题】:Spark 3 with Pandas Vectorised UDF's 【发布时间】:2020-06-30 10:58:15 【问题描述】:我正在研究在 PySpark (v3) 中使用 Pandas UDF。出于多种原因,我理解迭代和 UDF 通常是不好的,并且我理解我在这里展示的简单示例可以使用 SQL 函数完成 PySpark - 所有这些都不是重点!
我一直在关注本指南:https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html
我有一个来自文档的简单示例:
import pandas as pd
from typing import Iterator, Tuple
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
spark = SparkSession.builder.getOrCreate()
pdf = pd.DataFrame(([1, 2, 3], [4, 5, 6], [8, 9, 0]), columns=["x", "y", "z"])
df = spark.createDataFrame(pdf)
@pandas_udf('long')
def test1(x: pd.Series, y: pd.Series) -> pd.Series:
return x + y
df.select(test1(col("x"), col("y"))).show()
这对于执行基本算术非常有效——如果我想加法、乘法等,这很简单(但在没有函数的 PySpark 中也很简单)。
我想对这些值进行比较,例如:
@pandas_udf('long')
def test2(x: pd.Series, y: pd.Series) -> pd.Series:
return x if x > y else y
df.select(test2(col("x"), col("y"))).show()
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
会出错。我知道它正在评估系列而不是行值。
所以有一个迭代器的例子。这同样适用于他们提供的基本算术示例。但如果我尝试应用逻辑:
@pandas_udf("long")
def test3(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for x, y in batch_iter:
yield x if x > y else y
df.select(test3(col("x"), col("y"))).show()
我得到了和以前一样的 ValueError。
所以我的问题是我应该如何执行这样的逐行比较?在矢量化函数中是否可能?如果不是,那么它们的用例是什么?
【问题讨论】:
【参考方案1】:我想通了。在你写下来并将问题发布给全世界之后就很简单了。
只需返回一个数组,然后转换为 Pandas 系列:
@pandas_udf('long')
def test4(x: pd.Series, y: pd.Series) -> pd.Series:
return pd.Series([a if a > b else b for a, b in zip(x, y)])
df.select(test4(col("x"),col("y"))).show()
【讨论】:
【参考方案2】:过去两天我一直在寻找这个答案,谢谢 simon_dmorias!
我需要在这里稍微修改一下示例。我将单个 pandas_udf 分解为多个组件以便于管理。以下是我供其他人参考的示例:
xdf = pd.DataFrame(([1, 2, 3,'Fixed'], [4, 5, 6,'Variable'], [8, 9, 0,'Adjustable']), columns=["x", "y", "z", "Description"])
df = spark.createDataFrame(xdf)
def fnRate(x):
return pd.Series(['Fixed' if 'Fixed' in str(v) else 'Variable' if 'Variable' in str(v) else 'Other' for v in zip(x)])
@pandas_udf('string')
def fnRateRecommended(Description: pd.Series) -> pd.Series:
varProduct = fnRate(Description)
return varProduct
# call function
df.withColumn("Recommendation", fnRateRecommended(sf.col("Description"))).show()
【讨论】:
以上是关于带有 Pandas 矢量化 UDF 的 Spark 3的主要内容,如果未能解决你的问题,请参考以下文章
使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas
如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?
使用 spark.sparkContext.addPyFile 导入 Pandas UDF