在 pyspark 中,基于变量字段进行分组,并为特定值添加一个计数器(当变量更改时重置)

Posted

技术标签:

【中文标题】在 pyspark 中,基于变量字段进行分组,并为特定值添加一个计数器(当变量更改时重置)【英文标题】:In pyspark, group based on a variable field, and add a counter for particular values (which resets when variable changes) 【发布时间】:2019-07-10 14:53:16 【问题描述】:

从 pandas 数据帧创建 spark 数据帧

import pandas as pd
df = pd.DataFrame("b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2])

df2=spark.createDataFrame(df) 

接下来我在字段“b”上使用窗口分区

from pyspark.sql import window
win_spec = (window.Window.partitionBy(['b']).orderBy("Sno").rowsBetween(window.Window.unboundedPreceding, 0))

根据值添加一个字段 positive ,negative 并创建一个 lambda 函数

df2 = df2.withColumn("pos_neg",col("a") <0)
pos_neg_func =udf(lambda x: ((x) & (x != x.shift())).cumsum())

尝试创建一个新列(这是一个负值计数器,但在变量“b”内。所以当“b”中的字段发生变化时计数器会重新启动。此外,如果有连续的 -ve 值,它们应该被视为单个值,计数器变化 1

df3 = (df2.select('pos_neg',pos_neg_func('pos_neg').alias('val')))

我希望输出为,

      b  Sno  a    val  val_2
0   A    1  3  False      0
1   A    2 -4   True      1
2   A    3  2  False      1
3   A    4 -1   True      2
4   B    5 -3   True      1
5   B    6 -1   True      1
6   B    7 -7   True      1
7   C    8 -6   True      1
8   C    9  1  False      1
9   D   10  1  False      0
10  D   11 -1   True      1
11  D   12  1  False      1
12  D   13  4  False      1
13  D   14  5  False      1
14  D   15 -3   True      2
15  D   16  2  False      2
16  D   17  3  False      2
17  D   18  4  False      2
18  D   19 -1   True      3
19  D   20 -2   True      3

在 python 中,一个简单的函数如下所示:

df['val'] = df.groupby('b')['pos_neg'].transform(lambda x: ((x) & (x != x.shift())).cumsum())

josh-friedlander 在上述代码中提供了支持

【问题讨论】:

为什么val 不增加B 在列b 中的值? 你会遇到问题,因为你没有任何订单条款... @cronoik,如果有连续的负值,我将它们视为单个负值。希望能回答你的问题 @Steven 也添加了 orderby 子句,仍然存在一些问题 "win_spec = window.Window.partitionBy(['b']).orderBy("b").rowsBetween(window.Window .unboundedPreceding, 0))" @DevarshiMandal partitionBy(['b']).orderBy("b") 不是有效的 orderBy 子句 .... 结果不会是幂等的,我不确定你是否想要那样。 洗牌后的行顺序可能是不确定的 【参考方案1】:

Pyspark 没有移位功能,但您可以使用lag 窗口功能,它为您提供当前行之前的行。如果pos_neg 列的值为True 并且前一个pos_neg 的值为False,则第一个窗口(称为w)将val 列的值设置为1,否则设置为0。 通过第二个窗口(称为 w2),我们计算累积总和以获得您想要的

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window

df = pd.DataFrame("b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2])

df2=spark.createDataFrame(df) 

w = Window.partitionBy('b').orderBy('Sno')
w2 = Window.partitionBy('b').rowsBetween(Window.unboundedPreceding, 0).orderBy('Sno')

df2 = df2.withColumn("pos_neg",col("a") <0)

df2 = df2.withColumn('val', F.when((df2.pos_neg == True) & (F.lag('pos_neg', default=False).over(w) == False), 1).otherwise(0))
df2 = df2.withColumn('val',  F.sum('val').over(w2))

df2.show()

输出:

+---+---+---+-------+---+ 
|Sno|  a|  b|pos_neg|val| 
+---+---+---+-------+---+ 
|  5| -3|  B|   true|  1| 
|  6| -1|  B|   true|  1| 
|  7| -7|  B|   true|  1| 
| 10|  1|  D|  false|  0| 
| 11| -1|  D|   true|  1| 
| 12|  1|  D|  false|  1| 
| 13|  4|  D|  false|  1| 
| 14|  5|  D|  false|  1| 
| 15| -3|  D|   true|  2| 
| 16|  2|  D|  false|  2| 
| 17|  3|  D|  false|  2| 
| 18|  4|  D|  false|  2| 
| 19| -1|  D|   true|  3| 
| 20| -2|  D|   true|  3| 
|  8| -6|  C|   true|  1| 
|  9|  1|  C|  false|  1| 
|  1|  3|  A|  false|  0| 
|  2| -4|  A|   true|  1| 
|  3|  2|  A|  false|  1| 
|  4| -1|  A|   true|  2| 
+---+---+---+-------+---+

您可能想知道为什么需要有一列允许我们对数据集进行排序。让我试着用一个例子来解释这一点。以下数据由 pandas 读取并分配了索引(左列)。您想计算Truepos_neg 中的出现次数,并且您不想计算连续的True。这个逻辑导致val2列如下图:

    b  Sno  a   pos_neg  val_2
0   A    1  3  False      0
1   A    2 -4   True      1
2   A    3  2  False      1
3   A    4 -1   True      2
4   A    5 -5   True      2

...但这取决于它从 pandas 获得的索引(行顺序)。当您更改行的顺序(以及相应的 pandas 索引)时,将逻辑应用于相同的行时,您将得到不同的结果,因为顺序不同:

    b  Sno  a   pos_neg  val_2
0   A    1  3  False      0
1   A    3  2  False      0
2   A    2 -4   True      1
3   A    4 -1   True      1
4   A    5 -5   True      1

您会发现行的顺序很重要。您现在可能想知道为什么 pyspark 不像 pandas 那样创建索引。这是因为 spark 将您的数据保存在多个分区中,这些分区分布在您的集群上,并且取决于您的数据源,甚至能够分布式读取数据。因此,在读取数据期间不能添加索引。您可以在使用 monotonically_increasing_id 函数读取数据后添加一个,但由于读取过程,您的数据与数据源相比可能已经具有不同的顺序。

您的sno 列避免了这个问题,并保证您将始终获得相同数据的相同结果(确定性)。

【讨论】:

这确实很棒。非常感谢您在这么短的时间内分享答案。 'Sno' 顺序改变的任何原因? 我已经添加了一个解释。我希望它说清楚。

以上是关于在 pyspark 中,基于变量字段进行分组,并为特定值添加一个计数器(当变量更改时重置)的主要内容,如果未能解决你的问题,请参考以下文章

pandas使用groupby函数基于指定分组变量对dataframe数据进行分组使用min函数计算所有分组中指定数值变量的聚合最小值即字段在指定分组的最小值([]方括号指定需要计算的数值字段)

pandas使用groupby函数基于指定分组变量对dataframe数据进行分组使用mean函数计算所有分组中指定数值变量的聚合平均值即字段在指定分组的平均值([]方括号指定需要计算的数值字段)

pandas使用groupby函数基于指定分组变量对dataframe数据进行分组使用max函数计算所有分组中指定数值变量的聚合最大值即字段在指定分组的最大值([]方括号指定需要计算的数值字段)

pandas使用groupby函数基于指定分组变量对dataframe数据进行分组使用sum函数计算所有分组中指定数值变量的聚合加和值即字段在指定分组的加和值([]方括号指定需要计算的数值字段)

PySpark:如何在列中使用 Or 进行分组

Pyspark 最近使用的一些有趣姿势的梳理