使用 udf 以编程方式从数据框中选择列

Posted

技术标签:

【中文标题】使用 udf 以编程方式从数据框中选择列【英文标题】:Programatically select columns from a dataframe with udf 【发布时间】:2019-06-18 04:17:03 【问题描述】:

我是 pyspark 的新手。 我正在尝试使用包含 UDF 的配置文件来提取数据框的列。 如果我将选择列定义为客户端上的列表,它可以工作,但如果我从配置文件导入列表,则列列表的类型为字符串。 有没有替代方法。

使用 pyspark 打开 spark-shell。

*******************************************************************
version 2.2.0
Using Python version 2.7.16 (default, Mar 18 2019 18:38:44)
SparkSession available as 'spark'

*******************************************************************


jsonDF = spark.read.json("/tmp/people.json")
jsonDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

jsonDF.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


jsonCurDF = jsonDF.filter(jsonDF.age.isNotNull()).cache()

# Define the UDF

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s


# Selecting the columns from a list.

colSelList = ['age', 'name', squared_udf('age')]
jsonCurDF.select(colSelList).show()

+---+------+----------------+
|age|  name|squared_udf(age)|
+---+------+----------------+
| 30|  Andy|             900|
| 19|Justin|             361|
+---+------+----------------+

# If I use an external config file 

colSelListStr = ["age", "name" , "squared_udf('age')"]
jsonCurDF.select(colSelListStr).show()

上述命令失败“无法解析'`squared_udf('age')'

尝试注册函数,尝试 selectExpr 并使用列函数。

在 colSelList 中,udf 调用被转换为列类型。

print colSelList[2]
Column<squared_udf(age)

print colSelListStr[2]
squared_udf('age')

print column(colSelListStr[2])
Column<squared_udf('age')

我在这里做错了什么?还是有其他解决方案?

【问题讨论】:

【参考方案1】:

这是因为当您从列表中传递 squared_age 时,它​​被视为字符串而不是函数。 有一种方法可以做到这一点,并且不需要为此导入 UDF。 假设这是您需要选择的列表

直接传递此列表会导致错误,因为 squared_age 不包含在此数据框中

所以首先你将现有 df 的所有列放入一个列表中

existing_cols = df.columns

你的这些是你需要的列

现在取两个列表的交集 它会给你一个共同的元素列表

intersection = list(set(existing_cols) & set(col_list)) 

现在试试这样

newDF= df.select(intersection).rdd.map(lambda x: (x["age"], x["name"], x["age"]*x["age"])).toDF(col_list)

这会给你这个

希望这会有所帮助。

【讨论】:

谢谢基兰。平方函数只是一个例子。在我的情况下,编写 lambda 函数不是通用的。我的最终目标是拥有一个 udf 列表并根据配置文件从数据框中提取数据。

以上是关于使用 udf 以编程方式从数据框中选择列的主要内容,如果未能解决你的问题,请参考以下文章

将 UDF 动态应用于数据框中 N 列中的 1 到 N 列

如何使用 selectInput 从 R 中的数据框中选择特定列?

计算一次UDF

如何使用 uniroot 解决数据框中的用户定义函数 (UDF)?

使用 iloc 从数据框中切片多个列范围

以编程方式选择scala spark中的多个列[重复]