如何扫描列以在Pyspark DataFrame中获取新列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何扫描列以在Pyspark DataFrame中获取新列相关的知识,希望对你有一定的参考价值。
我有一个Pyspark DataFrame有两列:sendtime和charge_state,如果charge_state从“off”变为“on”,则新的充电周期开始。
现在我想标记每个充电周期以给我输出。
输入:
+-------------------+------------+
| sendtime|charge_state|
+-------------------+------------+
|2018-03-02 08:00:00| on|
...
|2018-03-02 09:42:32| on|
|2018-03-02 09:42:33| on|
|2018-03-02 09:42:34| on|
|2018-03-02 09:42:35| off|
|2018-03-02 09:42:36| off|
...
|2018-03-02 10:11:12| off|
|2018-03-02 10:11:13| on|
|2018-03-02 10:11:14| on|
...
输出:
+-------------------+------------+---------------+
| sendtime|charge_state|charge_cycle_ID|
+-------------------+------------+---------------+
|2018-03-02 08:00:00| on| c1|
...
|2018-03-02 09:42:32| on| c1|
|2018-03-02 09:42:33| on| c1|
|2018-03-02 09:42:34| on| c1|
|2018-03-02 09:42:35| off| c1|
|2018-03-02 09:42:36| off| c1|
...
|2018-03-02 10:11:12| off| c1|
|2018-03-02 10:11:13| on| c2|
|2018-03-02 10:11:14| on| c2|
...
答案
您可以使用Window函数执行此任务:
from pyspark.sql import functions as F
from pyspark.sql import Window
df.withColumn(
'charge_state_lag',
F.lag('charge_state').over(Window.partitionBy().orderBy('sendtime'))
).withColumn(
'fg',
F.when((F.col("charge_state")=="on")&(F.col("charge_state_lag")=="off"),1).otherwise(0)
).select(
'sendtime',
'charge_state',
F.concat(
F.lit('C'),
(F.sum('fg').over(Window.partitionBy().orderBy('sendtime'))+1).cast('string')
).alias("charge_cycle_ID")
).show()
以上是关于如何扫描列以在Pyspark DataFrame中获取新列的主要内容,如果未能解决你的问题,请参考以下文章
您如何保留原始列以在 r 中两个数据库的 full_join() 中进行比较