pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为

Posted

技术标签:

【中文标题】pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为【英文标题】:pyspark: unexpected behaviour when using multiple UDF functions on the same column (with arrays) 【发布时间】:2020-06-17 11:19:52 【问题描述】:

有人知道你使用多个 udf 函数时发生了什么吗:

我创建了一个测试数据框和两个示例 udf 函数:

from pyspark.sql.functions import udf

mylist = [
    [[1,2,3]],
    [[4,5,6]]
]

def f1(tlist):
    tlist[0]=111
    return 'result f1 is: '.format(tlist)
f1_udf = udf(f1, )

def f2(tlist):
    tlist[1]=222
    return 'result f2 is: '.format(tlist)
f2_udf = udf(f2, )

df = spark().createDataFrame(mylist).toDF('arr')
df.show()

给出以下结果:

+---------+
|      arr|
+---------+
|[1, 2, 3]|
|[4, 5, 6]|
+---------+

然后,我分别应用每个函数:

df.withColumn('f1', f1_udf('arr')).show(10, False)

给予

+---------+-------------------------+
|arr      |f1                       |
+---------+-------------------------+
|[1, 2, 3]|result f1 is: [111, 2, 3]|
|[4, 5, 6]|result f1 is: [111, 5, 6]|
+---------+-------------------------+

df.withColumn('f2', f2_udf('arr')).show(10,False)

给予

+---------+-------------------------+
|arr      |f2                       |
+---------+-------------------------+
|[1, 2, 3]|result f2 is: [1, 222, 3]|
|[4, 5, 6]|result f2 is: [4, 222, 6]|
+---------+-------------------------+

但是,这里开始了意想不到的行为

(df
 .withColumn('f1', f1_udf('arr'))
 .withColumn('f2', f2_udf('arr'))
).show(10, False)

给出意外结果,在第二个函数调用中混合两个函数的结果

+---------+-------------------------+---------------------------+
|arr      |f1                       |f2                         |
+---------+-------------------------+---------------------------+
|[1, 2, 3]|result f1 is: [111, 2, 3]|result f2 is: [111, 222, 3]|
|[4, 5, 6]|result f1 is: [111, 5, 6]|result f2 is: [111, 222, 6]|
+---------+-------------------------+---------------------------+
                                                    ^^^ : unexpected result

并且在改变调用函数的顺序时,

(df
 .withColumn('f2', f2_udf('arr'))
 .withColumn('f1', f1_udf('arr'))
).show(10, False)

给出了一个不同的,也是意想不到的结果:

+---------+-------------------------+---------------------------+
|arr      |f2                       |f1                         |
+---------+-------------------------+---------------------------+
|[1, 2, 3]|result f2 is: [1, 222, 3]|result f1 is: [111, 222, 3]|
|[4, 5, 6]|result f2 is: [4, 222, 6]|result f1 is: [111, 222, 6]|
+---------+-------------------------+---------------------------+
                                                         ^^^ : different unexpected result

似乎在固定的、未更改的 spark 列上调用函数不是彼此独立的,这意味着如果我们调用两个函数(即使中间有很多其他代码)混合了第一个函数调用的结果下一个... 或者,我错过了什么?

【问题讨论】:

【参考方案1】:

Spark 将相同的数组传递给两个函数 f1f2。由于第一个函数改变了数组的内容,第二个函数也看到了这些变化。你可以看到,如果你添加行

print("f1: id  of array is , content is ".format(id(tlist), tlist))

到第一个函数和

print("f2: id  of array is , content is ".format(id(tlist), tlist))

到第二个函数。

打印出来

f1: id  of array is 139782923179912, content is [1, 2, 3]
f2: id  of array is 139782923179912, content is [111, 2, 3]
f1: id  of array is 139782923180040, content is [4, 5, 6]
f2: id  of array is 139782923180040, content is [111, 5, 6]

(可能打印有点乱)

所以第二个函数看到的是第一个函数中已经改变的数组。

为了解决这个问题,函数应该创建自己的数组副本并只更改这些副本:

def f1(tlist):
    print("f1: id  of array is , content is ".format(id(tlist), tlist))
    newlist = tlist.copy()
    newlist[0]=111
    return 'result f1 is: '.format(newlist)

对应f2

获得预期行为的另一种方法是将两个 udfs 声明为 non-deterministic:

f1_udf = F.udf(f1, ).asNondeterministic()
f2_udf = F.udf(f2, ).asNondeterministic()

但是,我无法解释为什么这会有所帮助。

【讨论】:

以上是关于pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为的主要内容,如果未能解决你的问题,请参考以下文章

在同一会话上从 pyspark 运行多个配置单元查询

sql或pyspark中同一列的多个条件

在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()

pyspark中同一列的多个AND条件没有连接操作

Pyspark:将多个数组列拆分为行

Pyspark:将多个数组列拆分为行