PySpark UDF 返回可变大小的元组
Posted
技术标签:
【中文标题】PySpark UDF 返回可变大小的元组【英文标题】:PySpark UDF to return tuples of variable sizes 【发布时间】:2018-01-09 23:15:34 【问题描述】:我使用一个现有的 Dataframe 并创建一个新的 Dataframe,其中包含一个包含元组的字段。 UDF 用于生成此字段。例如,在这里,我获取一个源元组并修改其元素以生成一个新元组:
udf( lambda x: tuple([2*e for e in x], ...)
挑战在于元组的长度是事先不知道的,并且可以逐行更改。
根据我阅读相关讨论的理解,要返回一个元组,UDF 的返回类型必须声明为 StructType。但是,由于返回的元组中的元素数量是未知的,所以我不能只写如下内容:
StructType([
StructField("w1", IntegerType(), False),
StructField("w2", IntegerType(), False),
StructField("w3", IntegerType(), False)])
似乎可以返回列表,但列表对我不起作用,因为我需要在输出 Dataframe 中有一个可散列的对象。
我有什么选择?
提前致谢
【问题讨论】:
【参考方案1】:StructType
/ Row
表示固定大小的product type 对象,不能用于表示可变大小的对象。
为了表示同构集合,使用list
作为外部类型,ArrayType
作为 SQL 类型:
udf(lambda x: [2*e for e in x], ArrayType(IntegerType()))
或(Spark 2.2 或更高版本):
udf(lambda x: [2*e for e in x], "array<integer>")
在 Spark 2.4 或更高版本中,您可以使用transform
from pyspark.sql.functions import expr
expr("tranform(input_column, x -> 2 * x)")
【讨论】:
感谢您的快速回复。 ArrayType() 有效。与常规的 Python 数组/列表不同,ArrayType 似乎是可散列的。将这种类型用作 join() 和其他 Dataframe 和 SQL 操作的键时,是否需要注意任何问题(主要是速度)? 一般复杂的结构很难处理,只能用原生函数来表达一些操作。其余的将需要udf
,这很昂贵,尤其是在 Python 中。【参考方案2】:
每个 Databricks (Spark) 的新语法一次一行(语法更符合 Pandas UDF,这似乎是 udfs 在 python https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html 中的用途):
一次一行:
@udf(ArrayType(IntegerType()))
def new_tuple(x):
return [2*e for e in x]
【讨论】:
以上是关于PySpark UDF 返回可变大小的元组的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回