Pyspark udf 在接受多列作为输入的条件定义上返回一列
Posted
技术标签:
【中文标题】Pyspark udf 在接受多列作为输入的条件定义上返回一列【英文标题】:Pyspark udf returning one column on condition definitions accepting several columns as input 【发布时间】:2017-08-23 19:24:35 【问题描述】:我使用的是 spark 2.1,用法是 pyscripting
问题陈述:有一个场景需要传递多列作为输入并返回一列,因为下面的输出是我的 3 列输入数据框
a b c
S S S
S NS NS
S NS S
S S NS
NS S NS
我的输出必须如下
a b c d
S S S S
S NS NS NS
S NS S S
S S NS NS
NS S NS NS
我正在尝试注册一个 UDF 以传递这 3 列 [a,b,c] 作为输入并返回 d 列作为输出这里 a,b,c,d 是列名
我发现很难得到下面的输出是使用的语法
def return_string(x):
if [x.a=='s' & x.b=='S' & x.c=='s']
return 'S'
else if[x.a=='s' & x.b=='NS' & x.c=='s']
return 'S'
else if[x.a=='s' & x.b=='S' & x.c=='NS']
return 'NS;
func= udf(returnstring,types.StringType())
谁能帮我完成这个逻辑。
【问题讨论】:
三列都很重要吗?对于这个示例输出,它似乎只依赖于 C。 Pyspark: Pass multiple columns in UDF的可能重复 是的,这三个都很重要,因为如果 x.a=='NS' & x.b=='S' | 又错过了一个逻辑 | x.c=='NS' return 'NS' 但您提到的内容适合此示例输出,可以单独考虑其他列 【参考方案1】:我正在尝试使用内置的 withColumn
和 when
函数:
from pyspark.sql.functions import col, when, lit
df.withColumn('d', when(
((col('A') == 'S') & (col('B') == 'S') & (col('C')=='S'))
| ((col('A') == 'S') & (col('B') == 'NS') & (col('C')=='S'))
, lit('S')
).otherwise(lit('NS'))
).show()
这也是假设这两个值是互斥的(因此otherwise
)
【讨论】:
【参考方案2】:应该是:
@udf
def return_string(a, b, c):
if a == 's' and b == 'S' and c == 's':
return 'S'
if a == 's' and b == 'NS' and c == 's':
return 'S'
if a == 's' and b == 'S' and c == 'NS':
return 'NS'
df = sc.parallelize([('s', 'S', 'NS'), ('?', '?', '?')]).toDF(['a', 'b', 'c'])
df.withColumn('result', return_string('a', 'b', 'c')).show()
## +---+---+---+------+
## | a| b| c|result|
## +---+---+---+------+
## | s| S| NS| NS|
## | ?| ?| ?| null|
## +---+---+---+------+
应列出所有参数(除非您将数据作为struct
传递)。
您应该使用 and
而不是 &
(您评估的是逻辑表达式而不是 SQL 表达式)。
条件应该是表达式而不是列表(非空列表总是真实的)。
我个人会跳过所有ifs
并使用简单的dict
:
@udf
def return_string(a, b, c):
mapping =
('s', 'S', 's'): 'S',
('s', 'NS' 's'): 'S',
('s', 'S', 'NS'): 'NS',
return mapping.get((a, b, c))
根据您的要求调整条件。
总体而言,您应该更喜欢 Steven Laan 提供的 the excellent answer 中所示的 SQL 表达式(您可以使用 when(..., ...).when(..., ...)
链接多个条件)。
【讨论】:
以上是关于Pyspark udf 在接受多列作为输入的条件定义上返回一列的主要内容,如果未能解决你的问题,请参考以下文章
PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列