Pyspark:将UDF的结果迭代地写回数据框不会产生预期的结果

Posted

技术标签:

【中文标题】Pyspark:将UDF的结果迭代地写回数据框不会产生预期的结果【英文标题】:Pyspark: writing results of UDF iteratively back to dataframe does not produce expected results 【发布时间】:2018-03-19 15:17:28 【问题描述】:

我还是 pyspark 的新手,我正在尝试评估一个函数并在 UDF 的帮助下迭代地创建列。以下是函数:

def get_temp(df):
    l=['temp1','temp2','temp3']
    s=[0]
    pt = [0]
    start = [0]
    end = [0]
    cummulative_stat = [0]
    for p in xrange(1,4):
        def func(p):
            if p==1:
                pass
            elif p >1:
                start[0] = end[0]
                s[0]=2
                pt[0] =4
            end[0] = start[0] + pt[0] - s[0]
            return end[0]
        func_udf=udf(func,IntegerType())
        df=df.withColumn(l[p-1],func_udf(lit(p)))
    return df
df=get_temp(df)
df.show()

以上产生的结果:

+---+---+---+-----+-----+-----+
|  a|  b|  c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
|  2| 12|  5|    0|    2|    2|
|  8|  5|  7|    0|    4|    4|
|  9|  4|  3|    0|    2|    2|
|  3|  8|  2|    0|    4|    4|
+---+---+---+-----+-----+-----+

预期结果是:

+---+---+---+-----+-----+-----+
|  a|  b|  c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
|  2| 12|  5|    0|    2|    4|
|  8|  5|  7|    0|    2|    4|
|  9|  4|  3|    0|    2|    4|
|  3|  8|  2|    0|    2|    4|
+---+---+---+-----+-----+-----+

如果我只看内部函数的输出,结果与预期的一样,即:

s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
    def func():
        if p==1:
            pass
        elif p >1:
            start[0] = end[0]
            s[0]=2
            pt[0] =4
        end[0] = start[0] + pt[0] - s[0]
        return end[0]
    e=func()
    print e

output:
0
2
4

不确定将这些结果从 UDF 写回 df 的正确方法是什么。发布的数据框只是一个示例数据框,我需要使用 for 循环,因为在我的原始代码中,我在 for 循环中调用了其他函数(谁的输出取决于迭代器的值)。例如参考下面:

def get_temp(df):
    l=['temp1','temp2','temp3']
    s=[0]
    pt = [0]
    start = [0]
    end = [0]
    q=[]
    cummulative_stat = [0]
    for p in xrange(1,4):
        def func(p):
            if p < a:
                cummulative_stat[0]=cummulative_stat[0]+52
                pass
            elif p >=a:

                if p==1:
                    pass
                elif p >1:
                    start[0] = end[0]
                    s[0]=2
                    pt[0] =4
                if cummulative_stat and p >1:
                    var1=func2(p,3000)
                    var2=func3(var1)
                    cummulative_stat=np.nan
                else:
                    var1=func2(p,3000)
                    var2=func3(var1)         
                end[0] = start[0] + pt[0] - s[0]
            q.append(end[0],var1,var2)
            return q
        func_udf=udf(func,ArrayType(ArrayType(IntegerType())))
        df=df.withColumn(l[p-1],func_udf(lit(p)))
    return df
df=get_temp(df)
df.show()

我正在使用 pyspark 2.2。任何帮助深表感谢。 要创建此数据框:

rdd =  sc.parallelize([(2,12,5),(8,5,7),
                 (9,4,3),
                  (3,8,2)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c'))
df.show()

【问题讨论】:

为什么要用udf? 这可能是XY Problem 吗?你想做什么?可能有更简单的方法。 @pault 我用示例代码更新了问题。我想使用 udf 因为我确实在其中进行了其他函数调用,并最终对调用的函数进行了一些数学运算并返回输出。上面显示的函数: func 对其他函数调用进行数学运算 由于不同列的函数不同,我建议您为每列编写不同的函数并分别调用它们。 您能否进一步说明这一点?不确定我将如何解耦成单独的函数,因为 cummulative_stat 是根据其在前一次迭代中获得的值进行评估的。谢谢! 【参考方案1】:

据我了解,查看您的代码是 您的下一列值取决于前一列。如果我的理解是正确的,那么我可以说 您的 udf 函数定义放置在错误的位置您需要对代码进行细微的更改才能使其正常工作。

让我们一步一步来

你已经有了

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  2| 12|  5|
|  8|  5|  7|
|  9|  4|  3|
|  3|  8|  2|
+---+---+---+

我们需要一个初始化列,我看到它是 0

from pyspark.sql import functions as F
from pyspark.sql import types as T

df=df.withColumn('temp0', F.lit(0))

应该是

+---+---+---+-----+
|  a|  b|  c|temp0|
+---+---+---+-----+
|  2| 12|  5|    0|
|  8|  5|  7|    0|
|  9|  4|  3|    0|
|  3|  8|  2|    0|
+---+---+---+-----+

我们应该udf函数移出循环作为

def func(p, end):
    start = 0
    s = 0
    pt = 0
    if p==1:
        pass
    elif p >1:
        start = end
        s=2
        pt =4
    end = start + pt - s
    return end

func_udf=F.udf(func, T.IntegerType())

并且在循环中调用udf函数

def get_temp(df):
    l=['temp1','temp2','temp3']
    for p in xrange(1,4):
        df=df.withColumn(l[p-1],func_udf(F.lit(p), F.col('temp'+str(p-1))))
    return df

df=get_temp(df)

最后删除初始化列

df=df.drop('temp0')

这应该会给你你想要的输出

+---+---+---+-----+-----+-----+
|  a|  b|  c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
|  2| 12|  5|    0|    2|    4|
|  8|  5|  7|    0|    2|    4|
|  9|  4|  3|    0|    2|    4|
|  3|  8|  2|    0|    2|    4|
+---+---+---+-----+-----+-----+

希望回答对你有帮助

【讨论】:

以上是关于Pyspark:将UDF的结果迭代地写回数据框不会产生预期的结果的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 中的 UDF 能否返回与列不同的对象?

pyspark:将多个数据框字段传递给 udf

Apache Spark - 将 UDF 的结果分配给多个数据框列

带有 UDF 的 PySpark 数据框

pyspark 数据框 UDF 异常处理

过滤 pyspark 数据框中的行并创建一个包含结果的新列