Pyspark:迭代数据框中的组

Posted

技术标签:

【中文标题】Pyspark:迭代数据框中的组【英文标题】:Pyspark: Iterate over groups in a dataframe 【发布时间】:2019-07-24 15:10:22 【问题描述】:

我有以下按两列排序的数据框:idUpdated_date

初始数据帧:

       |id|date      |Updated_date       |
       |a |2019-02-14|2018-10-30 10:25:45|
       |a |2019-02-14|2018-11-28 10:51:34|
       |a |2019-01-11|2018-11-29 10:46:07|
       |a |2019-01-14|2018-11-30 10:42:56|
       |a |2019-01-16|2018-12-01 10:28:46|
       |a |2019-01-22|2018-12-02 10:22:06|
       |b |2019-01-25|2018-11-15 10:36:59|
       |b |2019-02-10|2018-11-16 10:58:01| 
       |b |2019-02-04|2018-11-17 10:42:12|
       |b |2019-02-10|2018-11-24 10:24:56|
       |b |2019-02-02|2018-12-01 10:28:46|

我想创建两个新列LBUB,这样: 对于每个idLBUB的第一个值是(日期+/- 10天)间隔的值,对于具有相同id的下一个值,我们验证@987654330是否@ 在上一行的 LBUB 之间,如果是,我们使用相同的值,否则我们重新计算一个新的间隔(+/- 10 天)。

我的预期输出:

       |id|date      |Updated_date       |LB        |UB        |
       |a |2019-02-14|2018-10-30 10:25:45|2019-02-04|2019-02-24|
       |a |2019-02-14|2018-11-28 10:51:34|2019-02-04|2019-02-24|
       |a |2019-01-11|2018-11-29 10:46:07|2019-01-01|2019-01-21|
       |a |2019-01-14|2018-11-30 10:42:56|2019-01-01|2019-01-21|
       |a |2019-01-16|2018-12-01 10:28:46|2019-01-01|2019-01-21|
       |a |2019-01-22|2018-12-02 10:22:06|2019-01-12|2019-02-01|
       |b |2019-01-25|2018-11-15 10:36:59|2019-01-15|2019-02-04|
       |b |2019-02-10|2018-11-16 10:58:01|2019-02-01|2019-02-20|
       |b |2019-02-04|2018-11-17 10:42:12|2019-02-01|2019-02-20|
       |b |2019-02-10|2018-11-24 10:24:56|2019-02-01|2019-02-20|
       |b |2019-02-02|2018-12-01 10:28:46|2019-02-01|2019-02-20|

如何遍历每个组内的行?

【问题讨论】:

有人在两天前问过这个问题。在“PySpark”下搜索它,您应该能够找到它。 【参考方案1】:

如果你对 udf 没问题。

from datetime import timedelta

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType

SS = SparkSession.builder.getOrCreate()


data = ["id": "a", "date": "2019-02-14", "updated_date": "2018-10-30 10:25:45",
        "id": "a", "date": "2019-02-14", "updated_date": "2018-11-28 10:51:34",
        "id": "a", "date": "2019-01-11", "updated_date": "2018-11-29 10:46:07",
        "id": "a", "date": "2019-01-14", "updated_date": "2018-11-30 10:42:56",
        "id": "a", "date": "2019-01-16", "updated_date": "2018-12-01 10:28:46",
        "id": "a", "date": "2019-01-22", "updated_date": "2018-12-02 10:22:06",
        "id": "b", "date": "2019-01-25", "updated_date": "2018-11-15 10:36:59",
        "id": "b", "date": "2019-02-10", "updated_date": "2018-11-16 10:58:01",
        "id": "b", "date": "2019-02-04", "updated_date": "2018-11-17 10:42:12",
        "id": "b", "date": "2019-02-10", "updated_date": "2018-11-24 10:24:56",
        "id": "b", "date": "2019-02-02", "updated_date": "2018-12-01 10:28:46"]


schema = 
    "fields": [
        
            "metadata": ,
            "name": "date",
            "nullable": False,
            "type": "date"
        ,
        
            "metadata": ,
            "name": "id",
            "nullable": False,
            "type": "string"
        ,
        
            "metadata": ,
            "name": "updated_date",
            "nullable": False,
            "type": "timestamp"
        ,

    ]


@udf("date")
def increment(cell):
    return cell+timedelta(days=10)


@udf("date")
def decrease(cell):
    return cell+timedelta(days=-10)

df = SS.createDataFrame(data)

for field in schema["fields"]:
    df = df.withColumn(field["name"], df[field["name"]].cast(field["type"]))

df.show()
df.printSchema()

df = df.withColumn("UB", increment("date"))
df = df.withColumn("LB", decrease("date"))

df.show()
df.printSchema()

我的输出:

# After aplying schema
+----------+---+-------------------+
|      date| id|       updated_date|
+----------+---+-------------------+
|2019-02-14|  a|2018-10-30 10:25:45|
|2019-02-14|  a|2018-11-28 10:51:34|
|2019-01-11|  a|2018-11-29 10:46:07|
|2019-01-14|  a|2018-11-30 10:42:56|
|2019-01-16|  a|2018-12-01 10:28:46|
|2019-01-22|  a|2018-12-02 10:22:06|
|2019-01-25|  b|2018-11-15 10:36:59|
|2019-02-10|  b|2018-11-16 10:58:01|
|2019-02-04|  b|2018-11-17 10:42:12|
|2019-02-10|  b|2018-11-24 10:24:56|
|2019-02-02|  b|2018-12-01 10:28:46|
+----------+---+-------------------+

root
 |-- date: date (nullable = true)
 |-- id: string (nullable = true)
 |-- updated_date: timestamp (nullable = true)


+----------+---+-------------------+----------+----------+
|      date| id|       updated_date|        UB|        LB|
+----------+---+-------------------+----------+----------+
|2019-02-14|  a|2018-10-30 10:25:45|2019-02-24|2019-02-04|
|2019-02-14|  a|2018-11-28 10:51:34|2019-02-24|2019-02-04|
|2019-01-11|  a|2018-11-29 10:46:07|2019-01-21|2019-01-01|
|2019-01-14|  a|2018-11-30 10:42:56|2019-01-24|2019-01-04|
|2019-01-16|  a|2018-12-01 10:28:46|2019-01-26|2019-01-06|
|2019-01-22|  a|2018-12-02 10:22:06|2019-02-01|2019-01-12|
|2019-01-25|  b|2018-11-15 10:36:59|2019-02-04|2019-01-15|
|2019-02-10|  b|2018-11-16 10:58:01|2019-02-20|2019-01-31|
|2019-02-04|  b|2018-11-17 10:42:12|2019-02-14|2019-01-25|
|2019-02-10|  b|2018-11-24 10:24:56|2019-02-20|2019-01-31|
|2019-02-02|  b|2018-12-01 10:28:46|2019-02-12|2019-01-23|
+----------+---+-------------------+----------+----------+

root
 |-- date: date (nullable = true)
 |-- id: string (nullable = true)
 |-- updated_date: timestamp (nullable = true)
 |-- UB: date (nullable = true)

【讨论】:

以上是关于Pyspark:迭代数据框中的组的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地将 PySpark 数据框中的行相乘?

计算 pyspark 数据框中的出现次数

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

如何将数据框中的连接值插入到 Pyspark 中的另一个数据框中?

pyspark 数据框中的自定义排序

应用 StringIndexer 更改 PySpark 数据框中的列