Pyspark udf 在接受多列作为输入的条件定义上返回一列

Posted

技术标签:

【中文标题】Pyspark udf 在接受多列作为输入的条件定义上返回一列【英文标题】:Pyspark udf returning one column on condition definitions accepting several columns as input 【发布时间】:2017-08-23 19:24:35 【问题描述】:

我使用的是 spark 2.1,用法是 pyscripting

问题陈述:有一个场景需要传递多列作为输入并返回一列,因为下面的输出是我的 3 列输入数据框

a b c

S S S

S NS NS

S NS S

S S NS

NS S NS

我的输出必须如下

a b c d

S S S S

S NS NS NS

S NS S S

S S NS NS

NS S NS NS

我正在尝试注册一个 UDF 以传递这 3 列 [a,b,c] 作为输入并返回 d 列作为输出这里 a,b,c,d 是列名

我发现很难得到下面的输出是使用的语法

def return_string(x):
      if [x.a=='s' & x.b=='S' & x.c=='s']
          return 'S'
      else if[x.a=='s' & x.b=='NS' & x.c=='s']
          return 'S'
      else if[x.a=='s' & x.b=='S' & x.c=='NS']
          return 'NS;

func= udf(returnstring,types.StringType())

谁能帮我完成这个逻辑。

【问题讨论】:

三列都很重要吗?对于这个示例输出,它似乎只依赖于 C。 Pyspark: Pass multiple columns in UDF的可能重复 是的,这三个都很重要,因为如果 x.a=='NS' & x.b=='S' | 又错过了一个逻辑 | x.c=='NS' return 'NS' 但您提到的内容适合此示例输出,可以单独考虑其他列 【参考方案1】:

我正在尝试使用内置的 withColumnwhen 函数:

from pyspark.sql.functions import col, when, lit

df.withColumn('d', when(
     ((col('A') == 'S') & (col('B') == 'S') & (col('C')=='S'))
   | ((col('A') == 'S') & (col('B') == 'NS') & (col('C')=='S'))
 , lit('S')
 ).otherwise(lit('NS'))
).show()

这也是假设这两个值是互斥的(因此otherwise

【讨论】:

【参考方案2】:

应该是:

@udf
def return_string(a, b, c):
    if a == 's' and b == 'S' and c == 's':
        return 'S'
    if a == 's' and b == 'NS' and c == 's':
        return 'S'
    if a == 's' and b == 'S' and c == 'NS':
        return 'NS'

df = sc.parallelize([('s', 'S', 'NS'), ('?', '?', '?')]).toDF(['a', 'b', 'c'])

df.withColumn('result', return_string('a', 'b', 'c')).show()
## +---+---+---+------+
## |  a|  b|  c|result|
## +---+---+---+------+
## |  s|  S| NS|    NS|
## |  ?|  ?|  ?|  null|
## +---+---+---+------+
应列出所有参数(除非您将数据作为struct 传递)。 您应该使用 and 而不是 &(您评估的是逻辑表达式而不是 SQL 表达式)。 条件应该是表达式而不是列表(非空列表总是真实的)。

我个人会跳过所有ifs 并使用简单的dict

@udf
def return_string(a, b, c):
    mapping = 
        ('s', 'S', 's'): 'S',
        ('s', 'NS' 's'): 'S',
        ('s', 'S', 'NS'): 'NS',
    
    return mapping.get((a, b, c))

根据您的要求调整条件。

总体而言,您应该更喜欢 Steven Laan 提供的 the excellent answer 中所示的 SQL 表达式(您可以使用 when(..., ...).when(..., ...) 链接多个条件)。

【讨论】:

以上是关于Pyspark udf 在接受多列作为输入的条件定义上返回一列的主要内容,如果未能解决你的问题,请参考以下文章

多列上的pyspark条件并返回新列

PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列

在 PySpark Pandas UDF 中指定用户定义函数的正确方法

PySpark UDF 无法识别参数数量

Hive UDF 返回多列输出

以整齐的方式将多列作为分组变量传递给 UDF