Apache Spark - 将 UDF 的结果分配给多个数据框列

Posted

技术标签:

【中文标题】Apache Spark - 将 UDF 的结果分配给多个数据框列【英文标题】:Apache Spark -- Assign the result of UDF to multiple dataframe columns 【发布时间】:2016-02-10 18:08:36 【问题描述】:

我正在使用 pyspark,使用 spark-csv 将大型 csv 文件加载到数据帧中,作为预处理步骤,我需要对其中一列中可用的数据应用各种操作(包含json 字符串)。这将返回 X 值,每个值都需要存储在各自单独的列中。

该功能将在 UDF 中实现。但是,我不确定如何从该 UDF 返回值列表并将这些值提供给各个列。下面是一个简单的例子:

(...)
from pyspark.sql.functions import udf
def udf_test(n):
    return [n/2, n%2]

test_udf=udf(udf_test)


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)

这会产生以下内容:

+------+----------+--------------------+
|amount|trans_date|                test|
+------+----------+--------------------+
|  28.0|2016-02-07|         [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows

将 udf 返回的两个(在此示例中)值存储在不同的列上的最佳方法是什么?现在它们被输入为字符串:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()

root
 |-- amount: float (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- test: string (nullable = true)

【问题讨论】:

【参考方案1】:

无法从单个 UDF 调用创建多个***列,但您可以创建一个新的 struct。它需要一个带有指定 returnType 的 UDF:

from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, FloatType

schema = StructType([
    StructField("foo", FloatType(), False),
    StructField("bar", FloatType(), False)
])

def udf_test(n):
    return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))

test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])

foobars = df.select(test_udf("y").alias("foobar"))
foobars.printSchema()
## root
##  |-- foobar: struct (nullable = true)
##  |    |-- foo: float (nullable = false)
##  |    |-- bar: float (nullable = false)

您使用简单的select 进一步扁平化架构:

foobars.select("foobar.foo", "foobar.bar").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+

另见Derive multiple columns from a single column in a Spark DataFrame

【讨论】:

太棒了!这非常适合我需要的东西。我大部分时间都在那里,但是将 StructType 模式错误地提供给 udf,这导致我的新列最终改为 StringType。非常感谢! 谢谢!!这正是我想要的。 :) 您也可以使用foobars.select("foobar.*") 而不是单独命名每一列。 您还可以通过两步过程“混合”原始列和 UDF 中的这些列:df.select("x", test_udf("y").alias("foobar")).select("x", "foobar.*") from pyspark.sql.types import StructType, StructField, FloatType【参考方案2】:

您可以使用 flatMap 一次性获取所需数据框的列

df=df.withColumn('udf_results',udf)  
df4=df.select('udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema)

【讨论】:

以上是关于Apache Spark - 将 UDF 的结果分配给多个数据框列的主要内容,如果未能解决你的问题,请参考以下文章

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

Apache Spark - UDF 似乎不适用于 spark-submit

Apache Spark - 注册 UDF - 返回数据帧

Apache Spark SQL StructType 和 UDF

Spark SQL UDF示例

Apache Spark Python UDF 失败