带有 iloc 的 Pandas_UDF 连接循环

Posted

技术标签:

【中文标题】带有 iloc 的 Pandas_UDF 连接循环【英文标题】:Pandas_UDF Concatenation loop with iloc 【发布时间】:2020-07-09 14:28:50 【问题描述】:

我在 Pandas 中相对较新,最近坚持使用用户定义的函数。

我的数据集是这样的:


|header|planned|
|  a   |   1   |
|  a   |   2   |
|  a   |   3   |
|  a   |   4   |
|  a   |   5   |
|  b   |   1   |
|  b   |   2   |
|  b   |   3   |
|  b   |   4   |
|  b   |   5   |

我必须将列 planned 中的值按两行组连接,以获得类似这样的结果:


|header|planned|p_cat|
|  a   |   1   | 1_2 | 
|  a   |   2   | 2_3 |
|  a   |   3   | 3_4 |
|  a   |   4   | 4_5 |
|  a   |   5   |     |
|  b   |   1   | 1_2 |
|  b   |   2   | 2_3 |
|  b   |   3   | 3_4 |
|  b   |   4   | 4_5 |
|  b   |   5   |     |

planned 列中的数字不按此特定顺序排列,但始终为整数。

我的 UDF:


schema = ds_adh.schema

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def concat_operations(ds_op):

        s = ds_op['planned']

        for index in range(ds_op['planned'].count()-1):
        
            # clearly working only for the last index
            couple = str([s.iloc[index]]) + '_' + str([s.iloc[index+1]])

        ds_op_new = ds_op

        ds_op_new ['p_cat'] = couple

        return ds_op_new

ds_adh = ds_adh.orderBy("time")
ds_adh = ds_adh.groupBy("header").apply(concat_operations)

我的问题:

串联本身不起作用 我不知道如何在 couple 中为循环的所有迭代推送连接

我也尝试过 pandaSeries,但没有成功。

这是我使用此代码时遇到的错误:


IndexError: single positional indexer is out-of-bounds

【问题讨论】:

这是一个虚拟问题还是实际问题?基于用户可以尝试使用 pyspark 内置函数进行复制,可能不需要 udf。 由于我的数据集包含 700 万行,我认为 UDF 将是提高性能的解决方案。我花了几个小时尝试多种语法但没有成功,确实这个错误可能隐藏了主要错误。 【参考方案1】:

如果这是一个实际问题,您可以使用 pyspark 内置函数,如下所示:

import pyspark.sql.functions as F
w = Window.partitionBy("header").orderBy("idx")

(df.withColumn("idx",F.monotonically_increasing_id())
   .withColumn("Lead",F.lead("planned").over(w))
   .withColumn("p_cat",F.when(F.col("Lead").isNull(),'')
   .otherwise(F.concat_ws("_","planned","Lead")))
   .orderBy("idx").drop("idx","Lead")).show()

+------+-------+-----+
|header|planned|p_cat|
+------+-------+-----+
|     a|      1|  1_2|
|     a|      2|  2_3|
|     a|      3|  3_4|
|     a|      4|  4_5|
|     a|      5|     |
|     b|      1|  1_2|
|     b|      2|  2_3|
|     b|      3|  3_4|
|     b|      4|  4_5|
|     b|      5|     |
+------+-------+-----+

【讨论】:

【参考方案2】:

使用内置窗口 lead 功能和 partitionBy 在标题和 orderBy 在计划列上,因为 udf 将减少性能。

from pyspark.sql import *
from pyspark.sql.functions import *
w=Window.partitionBy("header").orderBy("planned")
df.withColumn("p_cat", when(lead(col("planned"),1).over(w).isNull(),lit("")).otherwise(concat_ws("_",col("planned"),lead(col("planned"),1).over(w)))).show()
#+------+-------+-----+
#|header|planned|p_cat|
#+------+-------+-----+
#|     a|      1|  1_2|
#|     a|      2|  2_3|
#|     a|      3|  3_4|
#|     a|      4|  4_5|
#|     a|      5|     |
#|     b|      1|  1_2|
#|     b|      2|  2_3|
#|     b|      3|  3_4|
#|     b|      4|  4_5|
#|     b|      5|     |
#+------+-------+-----+

【讨论】:

看来我这次使用 udf 是错误的(尽管它会提高性能,因为我的数据集中有 7+ 百万行)。我试试,谢谢。

以上是关于带有 iloc 的 Pandas_UDF 连接循环的主要内容,如果未能解决你的问题,请参考以下文章

为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?

Pyspark - 调用 pandas_udf 时出错,结果返回 Series.interpolate()

Pandas for 循环没有用 iloc 正确更新行?

带有 PySpark 2.4 的 Pandas UDF [重复]

带有迭代的 Pandas DataFrame 切片

熊猫 iloc 和 loc & multiindex