Pyspark:迭代数据框中的组
Posted
技术标签:
【中文标题】Pyspark:迭代数据框中的组【英文标题】:Pyspark: Iterate over groups in a dataframe 【发布时间】:2019-07-24 15:10:22 【问题描述】:我有以下按两列排序的数据框:id
和 Updated_date
。
初始数据帧:
|id|date |Updated_date |
|a |2019-02-14|2018-10-30 10:25:45|
|a |2019-02-14|2018-11-28 10:51:34|
|a |2019-01-11|2018-11-29 10:46:07|
|a |2019-01-14|2018-11-30 10:42:56|
|a |2019-01-16|2018-12-01 10:28:46|
|a |2019-01-22|2018-12-02 10:22:06|
|b |2019-01-25|2018-11-15 10:36:59|
|b |2019-02-10|2018-11-16 10:58:01|
|b |2019-02-04|2018-11-17 10:42:12|
|b |2019-02-10|2018-11-24 10:24:56|
|b |2019-02-02|2018-12-01 10:28:46|
我想创建两个新列LB
和UB
,这样:
对于每个id
,LB
和UB
的第一个值是(日期+/- 10天)间隔的值,对于具有相同id
的下一个值,我们验证@987654330是否@ 在上一行的 LB
和 UB
之间,如果是,我们使用相同的值,否则我们重新计算一个新的间隔(+/- 10 天)。
我的预期输出:
|id|date |Updated_date |LB |UB |
|a |2019-02-14|2018-10-30 10:25:45|2019-02-04|2019-02-24|
|a |2019-02-14|2018-11-28 10:51:34|2019-02-04|2019-02-24|
|a |2019-01-11|2018-11-29 10:46:07|2019-01-01|2019-01-21|
|a |2019-01-14|2018-11-30 10:42:56|2019-01-01|2019-01-21|
|a |2019-01-16|2018-12-01 10:28:46|2019-01-01|2019-01-21|
|a |2019-01-22|2018-12-02 10:22:06|2019-01-12|2019-02-01|
|b |2019-01-25|2018-11-15 10:36:59|2019-01-15|2019-02-04|
|b |2019-02-10|2018-11-16 10:58:01|2019-02-01|2019-02-20|
|b |2019-02-04|2018-11-17 10:42:12|2019-02-01|2019-02-20|
|b |2019-02-10|2018-11-24 10:24:56|2019-02-01|2019-02-20|
|b |2019-02-02|2018-12-01 10:28:46|2019-02-01|2019-02-20|
如何遍历每个组内的行?
【问题讨论】:
有人在两天前问过这个问题。在“PySpark”下搜索它,您应该能够找到它。 【参考方案1】:如果你对 udf 没问题。
from datetime import timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType
SS = SparkSession.builder.getOrCreate()
data = ["id": "a", "date": "2019-02-14", "updated_date": "2018-10-30 10:25:45",
"id": "a", "date": "2019-02-14", "updated_date": "2018-11-28 10:51:34",
"id": "a", "date": "2019-01-11", "updated_date": "2018-11-29 10:46:07",
"id": "a", "date": "2019-01-14", "updated_date": "2018-11-30 10:42:56",
"id": "a", "date": "2019-01-16", "updated_date": "2018-12-01 10:28:46",
"id": "a", "date": "2019-01-22", "updated_date": "2018-12-02 10:22:06",
"id": "b", "date": "2019-01-25", "updated_date": "2018-11-15 10:36:59",
"id": "b", "date": "2019-02-10", "updated_date": "2018-11-16 10:58:01",
"id": "b", "date": "2019-02-04", "updated_date": "2018-11-17 10:42:12",
"id": "b", "date": "2019-02-10", "updated_date": "2018-11-24 10:24:56",
"id": "b", "date": "2019-02-02", "updated_date": "2018-12-01 10:28:46"]
schema =
"fields": [
"metadata": ,
"name": "date",
"nullable": False,
"type": "date"
,
"metadata": ,
"name": "id",
"nullable": False,
"type": "string"
,
"metadata": ,
"name": "updated_date",
"nullable": False,
"type": "timestamp"
,
]
@udf("date")
def increment(cell):
return cell+timedelta(days=10)
@udf("date")
def decrease(cell):
return cell+timedelta(days=-10)
df = SS.createDataFrame(data)
for field in schema["fields"]:
df = df.withColumn(field["name"], df[field["name"]].cast(field["type"]))
df.show()
df.printSchema()
df = df.withColumn("UB", increment("date"))
df = df.withColumn("LB", decrease("date"))
df.show()
df.printSchema()
我的输出:
# After aplying schema
+----------+---+-------------------+
| date| id| updated_date|
+----------+---+-------------------+
|2019-02-14| a|2018-10-30 10:25:45|
|2019-02-14| a|2018-11-28 10:51:34|
|2019-01-11| a|2018-11-29 10:46:07|
|2019-01-14| a|2018-11-30 10:42:56|
|2019-01-16| a|2018-12-01 10:28:46|
|2019-01-22| a|2018-12-02 10:22:06|
|2019-01-25| b|2018-11-15 10:36:59|
|2019-02-10| b|2018-11-16 10:58:01|
|2019-02-04| b|2018-11-17 10:42:12|
|2019-02-10| b|2018-11-24 10:24:56|
|2019-02-02| b|2018-12-01 10:28:46|
+----------+---+-------------------+
root
|-- date: date (nullable = true)
|-- id: string (nullable = true)
|-- updated_date: timestamp (nullable = true)
+----------+---+-------------------+----------+----------+
| date| id| updated_date| UB| LB|
+----------+---+-------------------+----------+----------+
|2019-02-14| a|2018-10-30 10:25:45|2019-02-24|2019-02-04|
|2019-02-14| a|2018-11-28 10:51:34|2019-02-24|2019-02-04|
|2019-01-11| a|2018-11-29 10:46:07|2019-01-21|2019-01-01|
|2019-01-14| a|2018-11-30 10:42:56|2019-01-24|2019-01-04|
|2019-01-16| a|2018-12-01 10:28:46|2019-01-26|2019-01-06|
|2019-01-22| a|2018-12-02 10:22:06|2019-02-01|2019-01-12|
|2019-01-25| b|2018-11-15 10:36:59|2019-02-04|2019-01-15|
|2019-02-10| b|2018-11-16 10:58:01|2019-02-20|2019-01-31|
|2019-02-04| b|2018-11-17 10:42:12|2019-02-14|2019-01-25|
|2019-02-10| b|2018-11-24 10:24:56|2019-02-20|2019-01-31|
|2019-02-02| b|2018-12-01 10:28:46|2019-02-12|2019-01-23|
+----------+---+-------------------+----------+----------+
root
|-- date: date (nullable = true)
|-- id: string (nullable = true)
|-- updated_date: timestamp (nullable = true)
|-- UB: date (nullable = true)
【讨论】:
以上是关于Pyspark:迭代数据框中的组的主要内容,如果未能解决你的问题,请参考以下文章
PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?