对 pyspark 数据框的多列应用不同的函数
Posted
技术标签:
【中文标题】对 pyspark 数据框的多列应用不同的函数【英文标题】:Apply different functions to many columns of a pyspark dataframe 【发布时间】:2020-07-29 12:32:34 【问题描述】:我有一个包含几列的 pyspark 数据框
col1 col2 col3
---------------------
1. 2.1. 3.2
3.2. 4.2. 5.1
我想将三个函数f1(x), f2(x), f3(x)
每一个应用到数据框的对应列,这样我就得到了
col1 col2 col3
-------------------------------
f1(1.) f2(2.1.) f3(3.2)
f1(3.2.) f2(4.2.) f3(5.1)
我试图避免为每一列定义一个 udf,所以我的想法是从应用函数的每一列构建一个 rdd(可能是带有索引的 zip,我也可以在原始数据集中定义它),然后加入原始数据框。
这是一个可行的解决方案,还是有更好的方法?
更新:按照@Andre' Perez 的建议,我可以为每列定义一个 udf 并使用 spark sql 来应用它,或者
import numpy as np
import pyspark.sql.functions as F
f1_udf = F.udf(lambda x: float(np.sin(x)), FloatType())
f2_udf = F.udf(lambda x: float(np.cos(x)), FloatType())
f3_udf = F.udf(lambda x: float(np.tan(x)), FloatType())
df = df.withColumn("col1", f1_udf("col1"))
df = df.withColumn("col2", f2_udf("col2"))
df = df.withColumn("col3", f3_udf("col3"))
【问题讨论】:
不需要使用rdd和joins。您可以将整行传递给 udf 并返回:Filter Pyspark Dataframe with udf on entire row 这些函数是用户定义的还是标准的pyspark函数? 这些是用户定义的函数,不是标准的内置函数 感谢@cronoik 的建议,但我想我需要将一整行传递给多个 udfs 您根本不想使用 udf 还是只使用一个 udf 而不是 3?目前我不清楚你想要实现什么。如果您不想使用 udf,您应该解释您的函数当前正在做什么(最好直接发布它们)。如果您只想使用一个 udf 而不是 3,可以按照我分享的链接中的说明进行操作。目前无法回答您的问题。 【参考方案1】:也许将这些函数注册为 UDF 会更好(即使您说您不想遵循这种方法)。
spark.udf.register("func1", f1)
spark.udf.register("func2", f2)
spark.udf.register("func3", f3)
然后,我会将 DataFrame 注册为 temporary view,并使用已注册的函数对其运行 Spark SQL 查询。
df.createOrReplaceTempView("dataframe")
df2 = spark.sql("select func1(col1), func2(col2), func3(col3) from dataframe")
【讨论】:
谢谢。我已编辑问题以包含您的建议,但我试图避免定义 udfs以上是关于对 pyspark 数据框的多列应用不同的函数的主要内容,如果未能解决你的问题,请参考以下文章