pyspark 在 udf 中获取结构数据类型的字段名称

Posted

技术标签:

【中文标题】pyspark 在 udf 中获取结构数据类型的字段名称【英文标题】:pyspark getting the field names of a a struct datatype inside a udf 【发布时间】:2019-09-04 10:05:24 【问题描述】:

我正在尝试将多个列作为StructType 传递给udf(使用pyspark.sql.functions.struct())。

在这个udf 中,我想获取我作为list 传递的结构列的字段,以便我可以遍历每一行传递的列。

基本上,我正在寻找此答案中提供的 scala 代码的 pyspark 版本 - Spark - pass full row to a udf and then get column name inside udf

【问题讨论】:

【参考方案1】:

您可以使用与您链接的帖子相同的方法,即使用pyspark.sql.Row。但是您可以使用.asDict()Row 转换为字典,而不是.schema.fieldNames

例如,这是一种同时迭代列名值的方法:

from pyspark.sql.functions import col, struct, udf

df = spark.createDataFrame([(1, 2, 3)], ["a", "b", "c"])
f = udf(lambda row: "; ".join(["=".join(map(str, [k,v])) for k, v in row.asDict().items()]))
df.select(f(struct(*df.columns)).alias("myUdfOutput")).show()
#+-------------+
#|  myUdfOutput|
#+-------------+
#|a=1; c=3; b=2|
#+-------------+

另一种方法是构建一个列名到值的MapType(),并将其传递给您的udf

from itertools import chain
from pyspark.sql.functions import create_map, lit

f2 = udf(lambda row: "; ".join(["=".join(map(str, [k,v])) for k, v in row.items()]))
df.select(
    f2(
        create_map(
            *chain.from_iterable([(lit(c), col(c)) for c in df.columns])
        )
    ).alias("myNewUdfOutput")
).show()
#+--------------+
#|myNewUdfOutput|
#+--------------+
#| a=1; c=3; b=2|
#+--------------+

第二种方法可以说是不必要的复杂,所以第一种方法是推荐的方法。

【讨论】:

谢谢!我尝试了 .asDict() 选项并且它有效。我也会尝试替代解决方案

以上是关于pyspark 在 udf 中获取结构数据类型的字段名称的主要内容,如果未能解决你的问题,请参考以下文章

所有列的 Pyspark 数据框数据类型由 UDF 更改为 String

PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构

如何在 PySpark 的 UDF 中返回“元组类型”?

PySpark - 从 UDF 获取行索引

UDF 的性能改进 - 在 pyspark 中获取每行最小值的列名

PySpark 结构化流将 udf 应用于窗口