为列生成值,直到下一个值到达配置单元表
Posted
技术标签:
【中文标题】为列生成值,直到下一个值到达配置单元表【英文标题】:Generating values to a column until next value arrives in a hive table 【发布时间】:2021-03-16 04:35:54 【问题描述】:我有两个数据集如下
-
数据集1
cust_id pt_dt
985XFT82Y4 20200824
985XFT82Y4 20200826
985XFT82Y4 20200902
985XFT82Y4 20200918
985XFT82Y4 20200930
985XFT82Y4 20201016
985XFT82Y4 20201021
985XFT82Y4 20201102
985XFT82Y4 20201111
985XFT82Y4 20201112
985XFT82Y4 20201208
985XFT82Y4 20210111
985XFT82Y4 20210202
985XFT82Y4 20210303
985XFT82Y4 20210309
985XFT82Y4 20210311
另一个, 2. 数据集2
cust_id chg_date ins_status
985XFT82Y4 2020-08-24 22:12:34.332000 subscribed
985XFT82Y4 2020-11-11 14:45:31.152000 installed
985XFT82Y4 2021-02-02 01:26:34.500000 migration
985XFT82Y4 2021-03-09 08:11:57.790000 setup done
不,我需要加入这两个数据集并生成一个数据集 datset_result 应该包含字段 cust_id、pt_dt、ins_status 连接应在 cust_id 和 pt_dt/chg_date 上完成。结果应该如下所示。
cust_id pt_dt ins_status
985XFT82Y4 20200824 subscribed
985XFT82Y4 20200826 subscribed
985XFT82Y4 20200902 subscribed
985XFT82Y4 20200918 subscribed
985XFT82Y4 20200930 subscribed
985XFT82Y4 20201016 subscribed
985XFT82Y4 20201021 subscribed
985XFT82Y4 20201102 subscribed
985XFT82Y4 20201111 installed
985XFT82Y4 20201112 installed
985XFT82Y4 20201208 installed
985XFT82Y4 20210111 installed
985XFT82Y4 20210202 migration
985XFT82Y4 20210303 migration
985XFT82Y4 20210309 setup done
985XFT82Y4 20210311 setup done
我已尝试将这两个数据集加入如下,但无法实现。
select a.cust_id, a.pt_dt, b.ins_status
from dataset1 a
left join dataset2 b
on (a.cust_id = b.cust_id)
and (a.pt_dt = regexp_replace(substr(b.chg_date,1,10), '-', ''))
有人可以建议我在 pyspark 或 hive 中执行此操作的最佳方法吗?
谢谢!!
【问题讨论】:
【参考方案1】:步骤如下:
string ---> timestamp ---> to_date
将 Lead() 函数应用于按 ID 分组的窗口规范,按日期排序
如何处理第一行和最后一行中的“无”值?用今天的日期填充它们,然后执行连接并从每个数据框中选择相关列。
在线代码@https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/415303719801042/5846184720595634/latest.html
import pyspark.sql.functions as F
from pyspark.sql import Window
from datetime import datetime
data = [("985XFT82Y4", "20200824"),
("985XFT82Y4", "20200826"),
("985XFT82Y4", "20200902"),
("985XFT82Y4", "20200918"),
("985XFT82Y4", "20200930"),
("985XFT82Y4", "20201016"),
("985XFT82Y4", "20201021"),
("985XFT82Y4", "20201102"),
("985XFT82Y4", "20201111"),
("985XFT82Y4", "20201112"),
("985XFT82Y4", "20201208"),
("985XFT82Y4", "20210111"),
("985XFT82Y4", "20210202"),
("985XFT82Y4", "20210303"),
("985XFT82Y4", "20210309"),
("985XFT82Y4", "20210311")]
df1 = spark.createDataFrame(data, ["cust_id", "pt_dt"]).withColumn("pt_dt", F.to_timestamp("pt_dt", "yyyyMMdd")).withColumn("pt_dt", F.date_format(F.col('pt_dt'),"yyyy-MM-dd"))
df1.show()
data1 = [("985XFT82Y4", "2020-08-24 22:12:34.332000", "subscribed"),
("985XFT82Y4", "2020-11-11 14:45:31.152000", "installed"),
("985XFT82Y4", "2021-02-02 01:26:34.500000", "migration"),
("985XFT82Y4", "2021-03-09 08:11:57.790000", "setup done")]
ts_pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS"
df2 = spark.createDataFrame(data1, ["cust_id", "chg_date", "ins_status"]).withColumn("chg_date", F.to_timestamp("chg_date", ts_pattern)).withColumn("chg_date", F.date_format(F.col('chg_date'),"yyyy-MM-dd"))
df2.show()
window_spec = Window.partitionBy("cust_id").orderBy("chg_date")
df2 = df2.withColumn("end_chg_date", F.lead("chg_date").over(window_spec))
df2 = df2.withColumn("end_chg_date", F.when(F.col("end_chg_date").isNull(), F.lit(datetime.now().strftime("%Y-%m-%d"))).otherwise(F.col("end_chg_date")))
df2.show()
+----------+----------+----------+------------+
| cust_id| chg_date|ins_status|end_chg_date|
+----------+----------+----------+------------+
|985XFT82Y4|2020-08-24|subscribed| 2021-03-16|
|985XFT82Y4|2020-11-11| installed| 2021-02-02|
|985XFT82Y4|2021-02-02| migration| 2021-03-09|
|985XFT82Y4|2021-03-09|setup done| 2021-03-16|
+----------+----------+----------+------------+
cond = [df1["cust_id"] == df2["cust_id"], df1["pt_dt"] >= df2["chg_date"], df1["pt_dt"] < df2["end_chg_date"]]
df3 = df1.join(df2, cond, "left").select(df1["cust_id"], df1["pt_dt"], "ins_status").orderBy("pt_dt")
# use df1 in select to resolve same column name conflict
df3.show()
+----------+----------+----------+
| cust_id| pt_dt|ins_status|
+----------+----------+----------+
|985XFT82Y4|2020-08-24|subscribed|
|985XFT82Y4|2020-08-26|subscribed|
|985XFT82Y4|2020-09-02|subscribed|
|985XFT82Y4|2020-09-18|subscribed|
|985XFT82Y4|2020-09-30|subscribed|
|985XFT82Y4|2020-10-16|subscribed|
|985XFT82Y4|2020-10-21|subscribed|
|985XFT82Y4|2020-11-02|subscribed|
|985XFT82Y4|2020-11-11| installed|
|985XFT82Y4|2020-11-12| installed|
|985XFT82Y4|2020-12-08| installed|
|985XFT82Y4|2021-01-11| installed|
|985XFT82Y4|2021-02-02| migration|
|985XFT82Y4|2021-03-03| migration|
|985XFT82Y4|2021-03-09|setup done|
|985XFT82Y4|2021-03-11|setup done|
+----------+----------+----------+
【讨论】:
嗨@Mageswaran - 感谢您的回复。如果您看到从代码中获得的结果,则说明发生了交叉连接。预期输出应包含ins_status
和 subscribed
从 chg_dt 2020-08-24
到 chg_dt 2020-11-11
和从 2020-11-11
到 2021-02-02
它应该是 installed
和从 20210202
到 @98764@4它应该是migration
等等......
有趣!明白了……让我试试
是否可以为每个 chg_date
设置开始和结束范围?这样我们就可以使用范围连接参考:docs.microsoft.com/en-us/azure/databricks/delta/…
你能检查一下笔记本吗?我想出了以下逻辑,这可能不适合您的业务需求: - 向前填写每个 cust_id
的日期并创建一个新的日期列,作为结束日期 - 对于新列中的空行,填写当前日期 -使用多个连接条件:id, start_date > value
【参考方案2】:
您可以在df2
(使用lead
)中添加一个包含下一行日期的列,以方便基于范围的连接:
from pyspark.sql import functions as F, Window
df3 = df1.withColumn('pt_date', F.to_date(df1.pt_dt.cast('string'), 'yyyyMMdd'))
df4 = df2.withColumn('next_date', F.lead('chg_date').over(Window.partitionBy('cust_id').orderBy('chg_date')))
result = df3.join(df4,
(df3.cust_id == df4.cust_id) &
(df3.pt_date >= df4.chg_date) &
((df3.pt_date < df4.next_date) | df4.next_date.isNull()),
'left'
).select(df3.cust_id, df3.pt_dt, df4.ins_status)
result.show()
+----------+--------+----------+
| cust_id| pt_dt|ins_status|
+----------+--------+----------+
|985XFT82Y4|20200824|subscribed|
|985XFT82Y4|20200826|subscribed|
|985XFT82Y4|20200902|subscribed|
|985XFT82Y4|20200918|subscribed|
|985XFT82Y4|20200930|subscribed|
|985XFT82Y4|20201016|subscribed|
|985XFT82Y4|20201021|subscribed|
|985XFT82Y4|20201102|subscribed|
|985XFT82Y4|20201111| installed|
|985XFT82Y4|20201112| installed|
|985XFT82Y4|20201208| installed|
|985XFT82Y4|20210111| installed|
|985XFT82Y4|20210202| migration|
|985XFT82Y4|20210303| migration|
|985XFT82Y4|20210309|setup done|
|985XFT82Y4|20210311|setup done|
+----------+--------+----------+
【讨论】:
酷!我没有想到df4.next_date.isNull()
的情况,我在这里做了额外的工作……我的错!以上是关于为列生成值,直到下一个值到达配置单元表的主要内容,如果未能解决你的问题,请参考以下文章