pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为
Posted
技术标签:
【中文标题】pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为【英文标题】:pyspark: unexpected behaviour when using multiple UDF functions on the same column (with arrays) 【发布时间】:2020-06-17 11:19:52 【问题描述】:有人知道你使用多个 udf 函数时发生了什么吗:
我创建了一个测试数据框和两个示例 udf 函数:
from pyspark.sql.functions import udf
mylist = [
[[1,2,3]],
[[4,5,6]]
]
def f1(tlist):
tlist[0]=111
return 'result f1 is: '.format(tlist)
f1_udf = udf(f1, )
def f2(tlist):
tlist[1]=222
return 'result f2 is: '.format(tlist)
f2_udf = udf(f2, )
df = spark().createDataFrame(mylist).toDF('arr')
df.show()
给出以下结果:
+---------+
| arr|
+---------+
|[1, 2, 3]|
|[4, 5, 6]|
+---------+
然后,我分别应用每个函数:
df.withColumn('f1', f1_udf('arr')).show(10, False)
给予
+---------+-------------------------+
|arr |f1 |
+---------+-------------------------+
|[1, 2, 3]|result f1 is: [111, 2, 3]|
|[4, 5, 6]|result f1 is: [111, 5, 6]|
+---------+-------------------------+
和
df.withColumn('f2', f2_udf('arr')).show(10,False)
给予
+---------+-------------------------+
|arr |f2 |
+---------+-------------------------+
|[1, 2, 3]|result f2 is: [1, 222, 3]|
|[4, 5, 6]|result f2 is: [4, 222, 6]|
+---------+-------------------------+
但是,这里开始了意想不到的行为
(df
.withColumn('f1', f1_udf('arr'))
.withColumn('f2', f2_udf('arr'))
).show(10, False)
给出意外结果,在第二个函数调用中混合两个函数的结果
+---------+-------------------------+---------------------------+
|arr |f1 |f2 |
+---------+-------------------------+---------------------------+
|[1, 2, 3]|result f1 is: [111, 2, 3]|result f2 is: [111, 222, 3]|
|[4, 5, 6]|result f1 is: [111, 5, 6]|result f2 is: [111, 222, 6]|
+---------+-------------------------+---------------------------+
^^^ : unexpected result
并且在改变调用函数的顺序时,
(df
.withColumn('f2', f2_udf('arr'))
.withColumn('f1', f1_udf('arr'))
).show(10, False)
给出了一个不同的,也是意想不到的结果:
+---------+-------------------------+---------------------------+
|arr |f2 |f1 |
+---------+-------------------------+---------------------------+
|[1, 2, 3]|result f2 is: [1, 222, 3]|result f1 is: [111, 222, 3]|
|[4, 5, 6]|result f2 is: [4, 222, 6]|result f1 is: [111, 222, 6]|
+---------+-------------------------+---------------------------+
^^^ : different unexpected result
似乎在固定的、未更改的 spark 列上调用函数不是彼此独立的,这意味着如果我们调用两个函数(即使中间有很多其他代码)混合了第一个函数调用的结果下一个... 或者,我错过了什么?
【问题讨论】:
【参考方案1】:Spark 将相同的数组传递给两个函数 f1
和 f2
。由于第一个函数改变了数组的内容,第二个函数也看到了这些变化。你可以看到,如果你添加行
print("f1: id of array is , content is ".format(id(tlist), tlist))
到第一个函数和
print("f2: id of array is , content is ".format(id(tlist), tlist))
到第二个函数。
打印出来
f1: id of array is 139782923179912, content is [1, 2, 3]
f2: id of array is 139782923179912, content is [111, 2, 3]
f1: id of array is 139782923180040, content is [4, 5, 6]
f2: id of array is 139782923180040, content is [111, 5, 6]
(可能打印有点乱)
所以第二个函数看到的是第一个函数中已经改变的数组。
为了解决这个问题,函数应该创建自己的数组副本并只更改这些副本:
def f1(tlist):
print("f1: id of array is , content is ".format(id(tlist), tlist))
newlist = tlist.copy()
newlist[0]=111
return 'result f1 is: '.format(newlist)
对应f2
。
获得预期行为的另一种方法是将两个 udfs 声明为 non-deterministic:
f1_udf = F.udf(f1, ).asNondeterministic()
f2_udf = F.udf(f2, ).asNondeterministic()
但是,我无法解释为什么这会有所帮助。
【讨论】:
以上是关于pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为的主要内容,如果未能解决你的问题,请参考以下文章