Pyspark 基于其他列和运行计数器添加列

Posted

技术标签:

【中文标题】Pyspark 基于其他列和运行计数器添加列【英文标题】:Pyspark add column based on other column and a running counter 【发布时间】:2020-05-15 10:39:19 【问题描述】:

我在 pyspark 数据框中有数据(这是一个非常大的表,有 900M 行)

数据框包含具有以下值的列:

+---------------+
|prev_display_id|
+---------------+
|           null|
|           null|
|           1062|
|           null|
|           null|
|           null|
|           null|
|       18882624|
|       11381128|
|           null|
|           null|
|           null|
|           null|
|           2779|
|           null|
|           null|
|           null|
|           null|
+---------------+

我正在尝试基于此列生成一个新列,如下所示:

+---------------+------+
|prev_display_id|result|
+---------------+------+
|           null|     0|
|           null|     1|
|           1062|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|       18882624|     0|
|       11381128|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|           2779|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
+---------------+------+

新列的功能类似于:

new_col = 0 if (prev_display_id!=null) else col = col+1

col 就像一个运行中的计数器,当遇到非空值时会重置为零。

如何在 pyspark 中有效地做到这一点?

更新

我尝试了下面@anki 建议的解决方案。我适用于小型数据集,但它会产生此错误:

WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

不幸的是,对于我的大数据集,它似乎杀死了集群。 在具有 2 个 rd5.2xlarge 数据节点的大数据集上运行时的错误见下图:

知道如何解决这个问题吗?

【问题讨论】:

这里的问题是优化。我考虑过分几个步骤做。首先,使用 lambda 函数在列上映射,该函数将 0 关联到非空值,将 1 关联到空值。然后再次映射到新列(使用 0 和 1)并相应地增加值。您可以使用每行的索引来执行此操作。我将尝试看看是否没有预定义的函数来执行此操作。 【参考方案1】:

据我了解,您可以使用 monotonically_increasing_id 创建一个 id 列,然后在 prev_display_id 不为 null 的情况下对窗口进行求和,然后取由该列分区的行号减去 1:

w = Window.orderBy(F.monotonically_increasing_id())
w1 = F.sum((F.col("prev_display_id").isNotNull()).cast("integer")).over(w)

(df.withColumn("result",F.row_number()
 .over(Window.partitionBy(w1).orderBy(w1))-1).drop("idx")).show()

+---------------+------+
|prev_display_id|result|
+---------------+------+
|           null|     0|
|           null|     1|
|           1062|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|       18882624|     0|
|       11381128|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
|           2779|     0|
|           null|     1|
|           null|     2|
|           null|     3|
|           null|     4|
+---------------+------+

【讨论】:

在新创建的窗口上进行分区的好方法..我在回答中错过了这一点【参考方案2】:

您可以通过运行以下命令来获取此信息:

window = Window.orderBy(f.monotonically_increasing_id())
df.withColumn('row',f.row_number().over(window))\
.withColumn('ne',f.when(f.col('consumer_id').isNotNull(),f.col('row')))\
.withColumn('result',f.when(f.col('ne').isNull(),f.col('row')-f.when(f.last('ne',ignorenulls=True)\
.over(window).isNull(),1).otherwise(f.last('ne',ignorenulls=True).over(window))).otherwise(0))\
.drop('row','ne').show()

+-----------+------+
|consumer_id|result|
+-----------+------+
|       null|     0|
|       null|     1|
|       null|     2|
|         11|     0|
|         11|     0|
|       null|     1|
|       null|     2|
|         12|     0|
|         12|     0|
+-----------+------+

【讨论】:

以上是关于Pyspark 基于其他列和运行计数器添加列的主要内容,如果未能解决你的问题,请参考以下文章

删除列和列计数的冗余函数调用

pyspark中基于条件对多列进行分组的累积和函数

具有列标题和计数的列名/基于其他表中列标签的数据透视

基于Oracle SQL中其他列的订单计数

基于 R 中的字段的运行计数

根据某些共享列创建标识符/计数器,并根据其他列进行分隔