通过将日期列与两个参考列进行比较来计算从日期开始的年期
Posted
技术标签:
【中文标题】通过将日期列与两个参考列进行比较来计算从日期开始的年期【英文标题】:Computing yearperiod from date by comparing date column with two reference columns 【发布时间】:2021-07-13 15:13:50 【问题描述】:我正在为我参与的项目做一些数据准备工作。我们在 Databricks 中完成大部分工作,使用底层 Apache Spark 对大型数据集进行计算。一切都在 PySpark 中完成。
我的目标是将日期变量转换为变量yearperiod
,它将一年分为 4 周的 13 个时段(有一些例外)。该值是年份和期间的串联,例如yearperiod = 201513
将是 2015 年,第 13 期。
我有两个表:yp_table
,其中包含年份的开始日期和结束日期(编辑:输入 DateType()
)(从 2012 年到现在,编辑:~120 行):
+----------+----------+----------+
| start| end|yearperiod|
+----------+----------+----------+
|2012-01-16|2012-01-29| 201201|
|2012-01-30|2012-02-26| 201202|
|2012-02-27|2012-03-25| 201203|
|2012-03-26|2012-04-22| 201204|
|2012-04-23|2012-05-20| 201205|
|2012-05-21|2012-06-17| 201206|
....
我有实际的data
表,其中包含一个日期列(编辑:输入StringType()
):
+--------+--------+--------+-----+
| Var1| Var2| Date| Var3|
+--------+--------+--------+-----+
| xxxxxx| xxxx|20191231| x,xx|
| xxxxxx| xxxx|20191231| x,xx|
| xxxxxx| xxxx|20191231| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
...
我的问题:如何通过将data.Date
与yp_table.start
和yp_table.end
进行比较来计算data
表的yearperiod
列?
到目前为止,我已经能够使其与常规 Python(具有列表推导的解决方案)一起使用,但事实证明它对于大型数据集来说太慢了。非常感谢任何帮助!
编辑:出于隐私原因,我无法提供数据帧的实际架构。我在上面进行了编辑以包含相关列的类型。
【问题讨论】:
yp_table
很小,不是吗?大约 100 行
表的架构是什么?你能显示data.printSchema()
和yp_table.printSchema()
吗?
以后请提供一个最小的、完整的、可验证的示例。我下面的解决方案说明了您如何以有人可以复制粘贴并运行示例的方式发布此内容。
@Steven,是的,大约 120 行。我宁愿不提供 dfs 的完整模式。不过,我已经为相关列提供了类型!
【参考方案1】:
在data
df 中添加一列,其中包含与yp_table
匹配格式的日期,然后将它们按日期间隔过滤。由于yp_table
很小,您可以使用广播连接来加快速度。
import pyspark.sql.functions as fun
# Date lookup
start_dates = ["2012-01-16", "2012-01-30", "2012-02-27", "2012-03-26", "2012-04-23", "2012-05-21"]
end_dates = ["2012-01-29", "2012-02-26", "2012-03-25", "2012-04-22", "2012-05-20", "2012-06-17"]
yearperiod = ["201201", "201202", "201203", "201204", "201205", "201206"]
yp_table = spark.createDataFrame(pd.DataFrame('start': start_dates, 'end': end_dates, 'yearperiod': yearperiod))
# Data df
dates = ["20120116", "20120130", "20120228", "20120301", "20200101", "20200101", "20200101"]
vals = range(0, len(dates))
data = spark.createDataFrame(pd.DataFrame('Dates':dates, 'vals': vals))
# Add formatted data_str column for joining
data = data.withColumn("date_str", fun.concat_ws("-", data.Dates.substr(0,4), data.Dates.substr(5,2), data.Dates.substr(7,2))) # + "-" + data.Dates.substr(6,8))
# Broadcase join small yp_table into the data table using conditional
joined = data.join(fun.broadcast(yp_table), (data.date_str >= yp_table.start) & (data.date_str < yp_table.end))
yp_table.show()
data.show()
joined.show()
+----------+----------+----------+
| start| end|yearperiod|
+----------+----------+----------+
|2012-01-16|2012-01-29| 201201|
|2012-01-30|2012-02-26| 201202|
|2012-02-27|2012-03-25| 201203|
|2012-03-26|2012-04-22| 201204|
|2012-04-23|2012-05-20| 201205|
|2012-05-21|2012-06-17| 201206|
+----------+----------+----------+
+--------+----+----------+
| Dates|vals| date_str|
+--------+----+----------+
|20120116| 0|2012-01-16|
|20120130| 1|2012-01-30|
|20120228| 2|2012-02-28|
|20120301| 3|2012-03-01|
|20200101| 4|2020-01-01|
|20200101| 5|2020-01-01|
|20200101| 6|2020-01-01|
+--------+----+----------+
+--------+----+----------+----------+----------+----------+
| Dates|vals| date_str| start| end|yearperiod|
+--------+----+----------+----------+----------+----------+
|20120116| 0|2012-01-16|2012-01-16|2012-01-29| 201201|
|20120130| 1|2012-01-30|2012-01-30|2012-02-26| 201202|
|20120228| 2|2012-02-28|2012-02-27|2012-03-25| 201203|
|20120301| 3|2012-03-01|2012-02-27|2012-03-25| 201203|
+--------+----+----------+----------+----------+----------+
【讨论】:
谢谢,这正是我想要完成的。我意识到我没有足够清楚地指定,但start
和end
列的类型是DateType()
。通过使用to_date
将data.Date
列转换为类型DateType()
,此解决方案有效!标记为已接受。
是的,这就是 MCVE 如此强大的原因。它迫使您仔细思考并详细说明问题的所有细节。老实说,我最终在大约 80% 的时间里通过创建 MCVE 的过程来回答我自己的问题。对 SO 点不利,但对个人代理有利,哈哈以上是关于通过将日期列与两个参考列进行比较来计算从日期开始的年期的主要内容,如果未能解决你的问题,请参考以下文章