带有 iloc 的 Pandas_UDF 连接循环
Posted
技术标签:
【中文标题】带有 iloc 的 Pandas_UDF 连接循环【英文标题】:Pandas_UDF Concatenation loop with iloc 【发布时间】:2020-07-09 14:28:50 【问题描述】:我在 Pandas 中相对较新,最近坚持使用用户定义的函数。
我的数据集是这样的:
|header|planned|
| a | 1 |
| a | 2 |
| a | 3 |
| a | 4 |
| a | 5 |
| b | 1 |
| b | 2 |
| b | 3 |
| b | 4 |
| b | 5 |
我必须将列 planned
中的值按两行组连接,以获得类似这样的结果:
|header|planned|p_cat|
| a | 1 | 1_2 |
| a | 2 | 2_3 |
| a | 3 | 3_4 |
| a | 4 | 4_5 |
| a | 5 | |
| b | 1 | 1_2 |
| b | 2 | 2_3 |
| b | 3 | 3_4 |
| b | 4 | 4_5 |
| b | 5 | |
planned
列中的数字不按此特定顺序排列,但始终为整数。
我的 UDF:
schema = ds_adh.schema
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def concat_operations(ds_op):
s = ds_op['planned']
for index in range(ds_op['planned'].count()-1):
# clearly working only for the last index
couple = str([s.iloc[index]]) + '_' + str([s.iloc[index+1]])
ds_op_new = ds_op
ds_op_new ['p_cat'] = couple
return ds_op_new
ds_adh = ds_adh.orderBy("time")
ds_adh = ds_adh.groupBy("header").apply(concat_operations)
我的问题:
串联本身不起作用 我不知道如何在couple
中为循环的所有迭代推送连接
我也尝试过 pandaSeries,但没有成功。
这是我使用此代码时遇到的错误:
IndexError: single positional indexer is out-of-bounds
【问题讨论】:
这是一个虚拟问题还是实际问题?基于用户可以尝试使用 pyspark 内置函数进行复制,可能不需要 udf。 由于我的数据集包含 700 万行,我认为 UDF 将是提高性能的解决方案。我花了几个小时尝试多种语法但没有成功,确实这个错误可能隐藏了主要错误。 【参考方案1】:如果这是一个实际问题,您可以使用 pyspark 内置函数,如下所示:
import pyspark.sql.functions as F
w = Window.partitionBy("header").orderBy("idx")
(df.withColumn("idx",F.monotonically_increasing_id())
.withColumn("Lead",F.lead("planned").over(w))
.withColumn("p_cat",F.when(F.col("Lead").isNull(),'')
.otherwise(F.concat_ws("_","planned","Lead")))
.orderBy("idx").drop("idx","Lead")).show()
+------+-------+-----+
|header|planned|p_cat|
+------+-------+-----+
| a| 1| 1_2|
| a| 2| 2_3|
| a| 3| 3_4|
| a| 4| 4_5|
| a| 5| |
| b| 1| 1_2|
| b| 2| 2_3|
| b| 3| 3_4|
| b| 4| 4_5|
| b| 5| |
+------+-------+-----+
【讨论】:
【参考方案2】:使用内置窗口 lead
功能和 partitionBy
在标题和 orderBy 在计划列上,因为 udf
将减少性能。
from pyspark.sql import *
from pyspark.sql.functions import *
w=Window.partitionBy("header").orderBy("planned")
df.withColumn("p_cat", when(lead(col("planned"),1).over(w).isNull(),lit("")).otherwise(concat_ws("_",col("planned"),lead(col("planned"),1).over(w)))).show()
#+------+-------+-----+
#|header|planned|p_cat|
#+------+-------+-----+
#| a| 1| 1_2|
#| a| 2| 2_3|
#| a| 3| 3_4|
#| a| 4| 4_5|
#| a| 5| |
#| b| 1| 1_2|
#| b| 2| 2_3|
#| b| 3| 3_4|
#| b| 4| 4_5|
#| b| 5| |
#+------+-------+-----+
【讨论】:
看来我这次使用 udf 是错误的(尽管它会提高性能,因为我的数据集中有 7+ 百万行)。我试试,谢谢。以上是关于带有 iloc 的 Pandas_UDF 连接循环的主要内容,如果未能解决你的问题,请参考以下文章
为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?
Pyspark - 调用 pandas_udf 时出错,结果返回 Series.interpolate()