PySpark UDF 返回可变大小的元组

Posted

技术标签:

【中文标题】PySpark UDF 返回可变大小的元组【英文标题】:PySpark UDF to return tuples of variable sizes 【发布时间】:2018-01-09 23:15:34 【问题描述】:

我使用一个现有的 Dataframe 并创建一个新的 Dataframe,其中包含一个包含元组的字段。 UDF 用于生成此字段。例如,在这里,我获取一个源元组并修改其元素以生成一个新元组:

udf( lambda x: tuple([2*e for e in x], ...)

挑战在于元组的长度是事先不知道的,并且可以逐行更改。

根据我阅读相关讨论的理解,要返回一个元组,UDF 的返回类型必须声明为 StructType。但是,由于返回的元组中的元素数量是未知的,所以我不能只写如下内容:

StructType([
    StructField("w1", IntegerType(), False),
    StructField("w2", IntegerType(), False),
    StructField("w3", IntegerType(), False)])

似乎可以返回列表,但列表对我不起作用,因为我需要在输出 Dataframe 中有一个可散列的对象。

我有什么选择?

提前致谢

【问题讨论】:

【参考方案1】:

StructType / Row 表示固定大小的product type 对象,不能用于表示可变大小的对象。

为了表示同构集合,使用list 作为外部类型,ArrayType 作为 SQL 类型:

udf(lambda x: [2*e for e in x], ArrayType(IntegerType()))

或(Spark 2.2 或更高版本):

udf(lambda x: [2*e for e in x], "array<integer>")

在 Spark 2.4 或更高版本中,您可以使用transform

from pyspark.sql.functions import expr

expr("tranform(input_column, x -> 2 * x)")

【讨论】:

感谢您的快速回复。 ArrayType() 有效。与常规的 Python 数组/列表不同,ArrayType 似乎是可散列的。将这种类型用作 join() 和其他 Dataframe 和 SQL 操作的键时,是否需要注意任何问题(主要是速度)? 一般复杂的结构很难处理,只能用原生函数来表达一些操作。其余的将需要 udf,这很昂贵,尤其是在 Python 中。【参考方案2】:

每个 Databricks (Spark) 的新语法一次一行(语法更符合 Pandas UDF,这似乎是 udfs 在 python https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html 中的用途):

一次一行:

@udf(ArrayType(IntegerType()))
def new_tuple(x):
    return [2*e for e in x]

【讨论】:

以上是关于PySpark UDF 返回可变大小的元组的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回

在 PySpark 中,有没有办法使用运行时给出的 Python 类的函数来动态注册 UDF? [复制]

pyspark udf 的可变参数数量

如何在pyspark中解析csv格式的元组数据?

扁平化包和元组的元组

PySpark Dataframe 将两列转换为基于第三列值的元组新列