PySpark 将算法转换为 UDF 并将其应用于 DataFrame

Posted

技术标签:

【中文标题】PySpark 将算法转换为 UDF 并将其应用于 DataFrame【英文标题】:PySpark Transform an algorithm to UDF and apply it on a DataFrame 【发布时间】:2020-01-15 05:43:57 【问题描述】:

我写了一个算法来做一些事情并打印输出。我的算法的输入是一个包含一些整数的列表。 这是作为列表的示例输入。

`mylist = [5,6,14,15,16,17,18,19,20,28,40,41,42,43,44,55]`

这是我的算法

    ```     

tduration = 0
duration = 0
avg = 0
bottleneck = 0
y = 0
x = 0
while x<len(mylist)-4 and y<len(mylist)-1 :
  if mylist[x+4] == mylist[x]+4:
    y = x + 4
    print("MY LIST X = ",mylist[x])
    print("X = ", x)
    print ("Y = ", y)
    while True:
      if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
        bottleneck = bottleneck + 1
        duration = mylist[y] - mylist[x] + 1
        tduration = tduration + duration
        avg = tduration/bottleneck
        x = y + 1
        print("MY LIST Y = " , mylist[y])
        print("Duration = " , duration)
        break
      else: 
        y = y + 1
  else: 
    x = x + 1
print("BottleneckCount = ", bottleneck,  "\nAverageDuration = ", avg)

 ```

现在我想将此“算法”转换为 PySpark 中的用户定义函数 (UDF),然后将此 UDF 应用于具有一列的 DataFrame。此 DataFrame 的每一行中都有一个列表。示例 DataFrame 有 1 列和 2 行。 row1 是[10,11,19,20,21,22,23,24,25,33,45] 的列表,row2 是[55,56,57,58,59,60,80,81,82,83,84,85,92,115] 的列表,因此UDF 应分别应用于DataFrame 的每一行,并为另一列中的每一行提供结果。 提前感谢您的时间和帮助。我会支持你的答案

【问题讨论】:

你希望你的函数返回什么? 我希望它返回在算法的最后一行打印的 2 个数字(“bottleneck”和“avg”)。 print("BottleneckCount = ",bottleneck, "\nAverageDuration = ", avg) 非常感谢您的帮助和支持。我真的需要答案。这是我为期 6 个月的项目的最后一部分。 【参考方案1】:

你可以这样做:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, ArrayType

def calculate(mylist):

    tduration = 0
    duration = 0
    avg = 0
    bottleneck = 0
    y = 0
    x = 0
    while x<len(mylist)-4 and y<len(mylist)-1 :

        if mylist[x+4] == mylist[x]+4:
            y = x + 4
            print("MY LIST X = ",mylist[x])
            print("X = ", x)
            print ("Y = ", y)
            while True:
                if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
                    bottleneck = bottleneck + 1
                    duration = mylist[y] - mylist[x] + 1
                    tduration = tduration + duration
                    avg = tduration/bottleneck
                    x = y + 1
                    print("MY LIST Y = " , mylist[y])
                    print("Duration = " , duration)
                    break
                else:
                    y = y + 1
        else:
            x = x + 1
    return bottleneck, avg

# sample data frame to use
df = spark.createDataFrame(
    [
        [[10,11,19,20,21,22,23,24,25,33,45]],
        [[55,56,57,58,59,60,80,81,82,83,84,85,92,115]],
    ],
    ['col1',]
)

df.show()

+--------------------+
|                col1|
+--------------------+
|[10, 11, 19, 20, ...|
|[55, 56, 57, 58, ...|
+--------------------+

# convert values to int  --- edit
f_to_int = F.udf(lambda x: list(map(int, x)))
df = df.withColumn('col1', f_to_int('col1'))

# create udf
func = F.udf(lambda x: calculate(x), ArrayType(IntegerType()))

# apply udf
df = df.withColumn('vals', func('col1'))

# create new cols
df = df.select("col1", df.vals[0].alias('bottleneck'), df.vals[1].alias('avg'))

df.show()

+--------------------+----------+----+
|                col1|bottleneck| avg|
+--------------------+----------+----+
|[10, 11, 19, 20, ...|         1|null|
|[55, 56, 57, 58, ...|         2|null|
+--------------------+----------+----+

【讨论】:

非常感谢您是否可以为我的问题投票,以便我可以为您的答案投票。 完成,不知道这是一个要求。也请接受它给未来的读者。 我意识到最后一列“avg”正在获取 NULL 值。原因是“平均”值不是整数。他们是“双打”。请问你也解决这个问题吗?!非常感谢您的帮助和支持。 如果type引起了麻烦,请检查编辑,在应用calculation函数之前我们不会先转换为整数。 谢谢。我不需要将任何内容转换为 int。最后一列(在上面的代码中)“avg”显示为 NULL,原因是“avg”值是双倍的。我希望它将“avg”列显示为双精度值。也许如果你运行一次代码,你会更好地理解我的意思。【参考方案2】:

YOLO 回答了这个问题,这是一个完整的答案。唯一的问题是,在“avg”的最后一列中,我们得到的是 NULL 值。 我意识到我可以通过在 YOLO 的回答中使用这个“func”而不是那个“func”来解决这个问题。

import pyspark.sql.types as T
func = F.udf(lambda x: calculate(x), T.StructType(
        [T.StructField("val1", T.IntegerType(), True),
         T.StructField("val2", T.FloatType(), True)]))

【讨论】:

以上是关于PySpark 将算法转换为 UDF 并将其应用于 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串

更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark

如何将标量 Pyspark UDF 转换为 Pandas UDF?

PySpark:Spark数据框-将ImageSchema列转换为nDArray作为新列

PySpark 结构化流将 udf 应用于窗口

Pyspark:UDF 将正则表达式应用于数据帧中的每一行