PySpark 将算法转换为 UDF 并将其应用于 DataFrame
Posted
技术标签:
【中文标题】PySpark 将算法转换为 UDF 并将其应用于 DataFrame【英文标题】:PySpark Transform an algorithm to UDF and apply it on a DataFrame 【发布时间】:2020-01-15 05:43:57 【问题描述】:我写了一个算法来做一些事情并打印输出。我的算法的输入是一个包含一些整数的列表。 这是作为列表的示例输入。
`mylist = [5,6,14,15,16,17,18,19,20,28,40,41,42,43,44,55]`
这是我的算法
```
tduration = 0
duration = 0
avg = 0
bottleneck = 0
y = 0
x = 0
while x<len(mylist)-4 and y<len(mylist)-1 :
if mylist[x+4] == mylist[x]+4:
y = x + 4
print("MY LIST X = ",mylist[x])
print("X = ", x)
print ("Y = ", y)
while True:
if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
bottleneck = bottleneck + 1
duration = mylist[y] - mylist[x] + 1
tduration = tduration + duration
avg = tduration/bottleneck
x = y + 1
print("MY LIST Y = " , mylist[y])
print("Duration = " , duration)
break
else:
y = y + 1
else:
x = x + 1
print("BottleneckCount = ", bottleneck, "\nAverageDuration = ", avg)
```
现在我想将此“算法”转换为 PySpark 中的用户定义函数 (UDF),然后将此 UDF 应用于具有一列的 DataFrame。此 DataFrame 的每一行中都有一个列表。示例 DataFrame 有 1 列和 2 行。 row1 是[10,11,19,20,21,22,23,24,25,33,45]
的列表,row2 是[55,56,57,58,59,60,80,81,82,83,84,85,92,115]
的列表,因此UDF 应分别应用于DataFrame 的每一行,并为另一列中的每一行提供结果。
提前感谢您的时间和帮助。我会支持你的答案
【问题讨论】:
你希望你的函数返回什么? 我希望它返回在算法的最后一行打印的 2 个数字(“bottleneck”和“avg”)。 print("BottleneckCount = ",bottleneck, "\nAverageDuration = ", avg) 非常感谢您的帮助和支持。我真的需要答案。这是我为期 6 个月的项目的最后一部分。 【参考方案1】:你可以这样做:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, ArrayType
def calculate(mylist):
tduration = 0
duration = 0
avg = 0
bottleneck = 0
y = 0
x = 0
while x<len(mylist)-4 and y<len(mylist)-1 :
if mylist[x+4] == mylist[x]+4:
y = x + 4
print("MY LIST X = ",mylist[x])
print("X = ", x)
print ("Y = ", y)
while True:
if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
bottleneck = bottleneck + 1
duration = mylist[y] - mylist[x] + 1
tduration = tduration + duration
avg = tduration/bottleneck
x = y + 1
print("MY LIST Y = " , mylist[y])
print("Duration = " , duration)
break
else:
y = y + 1
else:
x = x + 1
return bottleneck, avg
# sample data frame to use
df = spark.createDataFrame(
[
[[10,11,19,20,21,22,23,24,25,33,45]],
[[55,56,57,58,59,60,80,81,82,83,84,85,92,115]],
],
['col1',]
)
df.show()
+--------------------+
| col1|
+--------------------+
|[10, 11, 19, 20, ...|
|[55, 56, 57, 58, ...|
+--------------------+
# convert values to int --- edit
f_to_int = F.udf(lambda x: list(map(int, x)))
df = df.withColumn('col1', f_to_int('col1'))
# create udf
func = F.udf(lambda x: calculate(x), ArrayType(IntegerType()))
# apply udf
df = df.withColumn('vals', func('col1'))
# create new cols
df = df.select("col1", df.vals[0].alias('bottleneck'), df.vals[1].alias('avg'))
df.show()
+--------------------+----------+----+
| col1|bottleneck| avg|
+--------------------+----------+----+
|[10, 11, 19, 20, ...| 1|null|
|[55, 56, 57, 58, ...| 2|null|
+--------------------+----------+----+
【讨论】:
非常感谢您是否可以为我的问题投票,以便我可以为您的答案投票。 完成,不知道这是一个要求。也请接受它给未来的读者。 我意识到最后一列“avg”正在获取 NULL 值。原因是“平均”值不是整数。他们是“双打”。请问你也解决这个问题吗?!非常感谢您的帮助和支持。 如果type
引起了麻烦,请检查编辑,在应用calculation
函数之前我们不会先转换为整数。
谢谢。我不需要将任何内容转换为 int。最后一列(在上面的代码中)“avg”显示为 NULL,原因是“avg”值是双倍的。我希望它将“avg”列显示为双精度值。也许如果你运行一次代码,你会更好地理解我的意思。【参考方案2】:
YOLO 回答了这个问题,这是一个完整的答案。唯一的问题是,在“avg”的最后一列中,我们得到的是 NULL 值。 我意识到我可以通过在 YOLO 的回答中使用这个“func”而不是那个“func”来解决这个问题。
import pyspark.sql.types as T
func = F.udf(lambda x: calculate(x), T.StructType(
[T.StructField("val1", T.IntegerType(), True),
T.StructField("val2", T.FloatType(), True)]))
【讨论】:
以上是关于PySpark 将算法转换为 UDF 并将其应用于 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串
更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark
如何将标量 Pyspark UDF 转换为 Pandas UDF?