Pyspark:如何编写复杂的 Dataframe 算法问题(带条件求和)
Posted
技术标签:
【中文标题】Pyspark:如何编写复杂的 Dataframe 算法问题(带条件求和)【英文标题】:Pyspark: How to code Complicated Dataframe algorithm problem (summing with condition) 【发布时间】:2020-08-12 20:24:07 【问题描述】:我有一个如下所示的数据框:
TEST_schema = StructType([StructField("date", StringType(), True),\
StructField("Trigger", StringType(), True),\
StructField("value", FloatType(), True),\
StructField("col1", IntegerType(), True),
StructField("col2", IntegerType(), True),
StructField("want", FloatType(), True)])
TEST_data = [('2020-08-01','T',0.0,3,5,0.5),('2020-08-02','T',0.0,-1,4,0.0),('2020-08-03','T',0.0,-1,3,0.0),('2020-08-04','F',0.2,3,3,0.7),('2020-08-05','T',0.3,1,4,0.9),\
('2020-08-06','F',0.2,-1,3,0.0),('2020-08-07','T',0.2,-1,4,0.0),('2020-08-08','T',0.5,-1,5,0.0),('2020-08-09','T',0.0,-1,5,0.0)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df = TEST_df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
TEST_df.show()
+----------+-------+-----+----+----+
| date|Trigger|value|col1|col2|
+----------+-------+-----+----+----+
|2020-08-01| T| 0.0| 3| 5|
|2020-08-02| T| 0.0| -1| 4|
|2020-08-03| T| 0.0| -1| 3|
|2020-08-04| F| 0.2| 3| 3|
|2020-08-05| T| 0.3| 1| 4|
|2020-08-06| F| 0.2| -1| 3|
|2020-08-07| T| 0.2| -1| 4|
|2020-08-08| T| 0.5| -1| 5|
|2020-08-09| T| 0.0| -1| 5|
+----------+-------+-----+----+----+
date
: 排序很好
Trigger
:只有 T 或 F
value
:任意随机十进制(浮点)值
col1
:代表天数,不能小于-1。** -1
col2
:表示天数,不能为负数。 col2 >= 0
**计算逻辑**
如果col1 == -1, then return 0
,否则如果Trigger == T
,下图有助于理解逻辑。
如果我们看“红色”,+3 来自 col1 即 col1==3
at 2020-08-01,意思是我们跳了 3 行,同时还取了差 @987654335 @ (at 2020-08-01) 1 表示将下一个值相加,即0.2 + 0.3 = 0.5
。同样的逻辑也适用于“蓝色”
“绿色”用于trigger == "F"
时取(col2 -1)=3-1 =2
(2020-08-04),2 表示接下来两个值的总和。这是0.2+0.3+0.2 = 0.7
编辑:
如果我根本不想要条件怎么办,假设我们有这个 df
TEST_schema = StructType([StructField("date", StringType(), True),\
StructField("value", FloatType(), True),\
StructField("col2", IntegerType(), True)])
TEST_data = [('2020-08-01',0.0,5),('2020-08-02',0.0,4),('2020-08-03',0.0,3),('2020-08-04',0.2,3),('2020-08-05',0.3,4),\
('2020-08-06',0.2,3),('2020-08-07',0.2,4),('2020-08-08',0.5,5),('2020-08-09',0.0,5)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df = TEST_df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
TEST_df.show()
+----------+-----+----+
| date|value|col2|
+----------+-----+----+
|2020-08-01| 0.0| 5|
|2020-08-02| 0.0| 4|
|2020-08-03| 0.0| 3|
|2020-08-04| 0.2| 3|
|2020-08-05| 0.3| 4|
|2020-08-06| 0.2| 3|
|2020-08-07| 0.2| 4|
|2020-08-08| 0.5| 5|
|2020-08-09| 0.0| 5|
+----------+-----+----+
当我们有 Trigger == "F" 条件时,同样的逻辑适用,所以 col2 -1
但在这种情况下没有条件。
【问题讨论】:
【参考方案1】:IIUC,我们可以使用 Windows 函数 collect_list
获取所有相关行,将结构数组按 date
排序,然后根据该数组的 slice 进行聚合。每个slice的start_idx和span可以基于以下定义:
-
如果 col1 = -1,start_idx = 1 和 span = 0,因此没有汇总任何内容
否则,如果 Trigger = 'F',则 start_idx = 1 和 span em> = col2
否则 start_idx = col1+1 和 span = col2-col1
请注意,函数切片的索引是基于1的。
代码:
from pyspark.sql.functions import to_date, sort_array, collect_list, struct, expr
from pyspark.sql import Window
w1 = Window.orderBy('date').rowsBetween(0, Window.unboundedFollowing)
# columns used to do calculations, date must be the first field for sorting purpose
cols = ["date", "value", "start_idx", "span"]
df_new = (TEST_df
.withColumn('start_idx', expr("IF(col1 = -1 OR Trigger = 'F', 1, col1+1)"))
.withColumn('span', expr("IF(col1 = -1, 0, IF(Trigger = 'F', col2, col2-col1))"))
.withColumn('dta', sort_array(collect_list(struct(*cols)).over(w1)))
.withColumn("want1", expr("aggregate(slice(dta,start_idx,span), 0D, (acc,x) -> acc+x.value)"))
)
结果:
df_new.show()
+----------+-------+-----+----+----+----+---------+----+--------------------+------------------+
| date|Trigger|value|col1|col2|want|start_idx|span| dta| want1|
+----------+-------+-----+----+----+----+---------+----+--------------------+------------------+
|2020-08-01| T| 0.0| 3| 5| 0.5| 4| 2|[[2020-08-01, T, ...|0.5000000149011612|
|2020-08-02| T| 0.0| -1| 4| 0.0| 1| 0|[[2020-08-02, T, ...| 0.0|
|2020-08-03| T| 0.0| -1| 3| 0.0| 1| 0|[[2020-08-03, T, ...| 0.0|
|2020-08-04| F| 0.2| 3| 3| 0.7| 1| 3|[[2020-08-04, F, ...|0.7000000178813934|
|2020-08-05| T| 0.3| 1| 4| 0.9| 2| 3|[[2020-08-05, T, ...|0.9000000059604645|
|2020-08-06| F| 0.2| -1| 3| 0.0| 1| 0|[[2020-08-06, F, ...| 0.0|
|2020-08-07| T| 0.2| -1| 4| 0.0| 1| 0|[[2020-08-07, T, ...| 0.0|
|2020-08-08| T| 0.5| -1| 5| 0.0| 1| 0|[[2020-08-08, T, ...| 0.0|
|2020-08-09| T| 0.0| -1| 5| 0.0| 1| 0|[[2020-08-09, T, ...| 0.0|
+----------+-------+-----+----+----+----+---------+----+--------------------+------------------+
一些解释:
slice 函数除了目标数组外还需要两个参数。在我们的代码中,start_idx
是起始索引,span
是切片的长度。在代码中,我使用 IF 语句根据您原始帖子中的图表规范计算 start_idx 和 span。
collect_list + sort_array 在窗口w1
上生成的数组覆盖从当前行到窗口末尾的行(参见w1
分配)。然后我们在 aggregate 函数中使用 slice 函数来仅检索必要的数组项。
SparkSQL 内置函数aggregate 采用以下形式:
aggregate(expr, start, merge, finish)
可以跳过第四个参数finish
。在我们的例子中,它可以被重新格式化为(您可以复制以下代码来替换 expr .withColumn('want1', expr(""" .... """)
中的代码):
aggregate(
/* targeting array, use slice function to take only part of the array `dta` */
slice(dta,start_idx,span),
/* start, zero_value used for reduce */
0D,
/* merge, similar to reduce function */
(acc,x) -> acc+x.value,
/* finish, skipped in the post, but you can do some post-processing here, for example, round-up the result from merge */
acc -> round(acc, 2)
)
aggregate 函数的作用类似于 Python 中的 reduce 函数,第二个参数是零值(0D
是 double(0)
的快捷方式,用于对聚合变量的数据类型acc
)。
如 cmets 中所述,如果 col2 where Trigger = 'T' and col1 ! = -1 存在,它会在当前代码中产生一个负的span。在这种情况下,我们应该使用全尺寸的 Window 规格:
w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
并使用array_position找到当前行的位置(refer to one of my recent posts),然后根据该位置计算start_idx。
【讨论】:
如果可能有Trigger='T' AND col1 != -1 AND col2 < col1
,我们需要设置Window Spec覆盖所有相关行rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
,通过array_position找到当前行,然后计算start_idx。
干得好..但我不太明白aggregate(slice(dta,start_idx,length_idx), 0D, (acc,x) -> acc+x.value)
你能提供我第1行的一个例子吗?
@hellotherebj,我只是添加了一些解释,还将一列从length_idx
重命名为span
。如果您需要更多示例,请告诉我。
据我所知,设置.withColumn('span', expr("IF(col1 = -1, 0, IF(Trigger = 'F', col3, col2-col1))"))
我想你可以试试:.withColumn("want1", expr("aggregate(slice(dta,1,col2), 0D, (acc,x) -> acc+x.value)"))
。我今天很忙,无法专注于此。我可能需要明天或今晚晚些时候检查您的新问题。@hellotherebj以上是关于Pyspark:如何编写复杂的 Dataframe 算法问题(带条件求和)的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame
通过 pyspark.sql.dataframe 将 XML 数据转换为 pandas 数据帧
用列表 Pyspark Dataframe 中的值替换 NA