如何扫描列以在Pyspark DataFrame中获取新列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何扫描列以在Pyspark DataFrame中获取新列相关的知识,希望对你有一定的参考价值。

我有一个Pyspark DataFrame有两列:sendtime和charge_state,如果charge_state从“off”变为“on”,则新的充电周期开始。

现在我想标记每个充电周期以给我输出。

输入:

+-------------------+------------+
|           sendtime|charge_state|
+-------------------+------------+
|2018-03-02 08:00:00|          on|
...
|2018-03-02 09:42:32|          on|
|2018-03-02 09:42:33|          on|
|2018-03-02 09:42:34|          on|
|2018-03-02 09:42:35|         off|
|2018-03-02 09:42:36|         off|
...
|2018-03-02 10:11:12|         off|
|2018-03-02 10:11:13|          on|
|2018-03-02 10:11:14|          on|
...

输出:

+-------------------+------------+---------------+
|           sendtime|charge_state|charge_cycle_ID|
+-------------------+------------+---------------+
|2018-03-02 08:00:00|          on|             c1|
...
|2018-03-02 09:42:32|          on|             c1|
|2018-03-02 09:42:33|          on|             c1|
|2018-03-02 09:42:34|          on|             c1|
|2018-03-02 09:42:35|         off|             c1|
|2018-03-02 09:42:36|         off|             c1|
...
|2018-03-02 10:11:12|         off|             c1|
|2018-03-02 10:11:13|          on|             c2|
|2018-03-02 10:11:14|          on|             c2|
...
答案

您可以使用Window函数执行此任务:

from pyspark.sql import functions as F
from pyspark.sql import Window

df.withColumn(
    'charge_state_lag', 
    F.lag('charge_state').over(Window.partitionBy().orderBy('sendtime'))
).withColumn(
    'fg', 
    F.when((F.col("charge_state")=="on")&(F.col("charge_state_lag")=="off"),1).otherwise(0)
).select(
    'sendtime',
    'charge_state',
    F.concat(
        F.lit('C'),
        (F.sum('fg').over(Window.partitionBy().orderBy('sendtime'))+1).cast('string')
    ).alias("charge_cycle_ID")
).show()

以上是关于如何扫描列以在Pyspark DataFrame中获取新列的主要内容,如果未能解决你的问题,请参考以下文章

如何从多索引中提取总年份行和列以在绘图中创建直方图

您如何保留原始列以在 r 中两个数据库的 full_join() 中进行比较

提取 csv 文件特定列以在 Python 中列出

实体框架中的代码首先设置列以在 sql server 中键入 datetime2

如何安装 pyspark 以在独立脚本中使用?

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?