为列生成值,直到下一个值到达配置单元表

Posted

技术标签:

【中文标题】为列生成值,直到下一个值到达配置单元表【英文标题】:Generating values to a column until next value arrives in a hive table 【发布时间】:2021-03-16 04:35:54 【问题描述】:

我有两个数据集如下

    数据集1
cust_id     pt_dt
985XFT82Y4  20200824
985XFT82Y4  20200826
985XFT82Y4  20200902
985XFT82Y4  20200918
985XFT82Y4  20200930
985XFT82Y4  20201016
985XFT82Y4  20201021
985XFT82Y4  20201102
985XFT82Y4  20201111
985XFT82Y4  20201112
985XFT82Y4  20201208
985XFT82Y4  20210111
985XFT82Y4  20210202
985XFT82Y4  20210303
985XFT82Y4  20210309
985XFT82Y4  20210311

另一个, 2. 数据集2

cust_id     chg_date                       ins_status
985XFT82Y4  2020-08-24 22:12:34.332000     subscribed
985XFT82Y4  2020-11-11 14:45:31.152000     installed
985XFT82Y4  2021-02-02 01:26:34.500000     migration
985XFT82Y4  2021-03-09 08:11:57.790000     setup done

不,我需要加入这两个数据集并生成一个数据集 datset_result 应该包含字段 cust_id、pt_dt、ins_status 连接应在 cust_id 和 pt_dt/chg_date 上完成。结果应该如下所示。

cust_id     pt_dt           ins_status
985XFT82Y4  20200824        subscribed
985XFT82Y4  20200826        subscribed
985XFT82Y4  20200902        subscribed
985XFT82Y4  20200918        subscribed
985XFT82Y4  20200930        subscribed
985XFT82Y4  20201016        subscribed
985XFT82Y4  20201021        subscribed
985XFT82Y4  20201102        subscribed
985XFT82Y4  20201111        installed
985XFT82Y4  20201112        installed
985XFT82Y4  20201208        installed
985XFT82Y4  20210111        installed
985XFT82Y4  20210202        migration
985XFT82Y4  20210303        migration
985XFT82Y4  20210309        setup done
985XFT82Y4  20210311        setup done

我已尝试将这两个数据集加入如下,但无法实现。

select a.cust_id, a.pt_dt, b.ins_status
from dataset1 a 
left join dataset2 b
on (a.cust_id = b.cust_id)
and (a.pt_dt = regexp_replace(substr(b.chg_date,1,10), '-', ''))

有人可以建议我在 pyspark 或 hive 中执行此操作的最佳方法吗?

谢谢!!

【问题讨论】:

【参考方案1】:

步骤如下:

string ---> timestamp ---> to_date 将 Lead() 函数应用于按 ID 分组的窗口规范,按日期排序 如何处理第一行和最后一行中的“无”值?用今天的日期填充它们,然后执行连接并从每个数据框中选择相关列。

在线代码@https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/415303719801042/5846184720595634/latest.html

import pyspark.sql.functions as F
from pyspark.sql import Window
from datetime import datetime

data = [("985XFT82Y4", "20200824"),
("985XFT82Y4", "20200826"), 
("985XFT82Y4", "20200902"), 
("985XFT82Y4", "20200918"), 
("985XFT82Y4", "20200930"), 
("985XFT82Y4", "20201016"), 
("985XFT82Y4", "20201021"), 
("985XFT82Y4", "20201102"), 
("985XFT82Y4", "20201111"), 
("985XFT82Y4", "20201112"), 
("985XFT82Y4", "20201208"), 
("985XFT82Y4", "20210111"), 
("985XFT82Y4", "20210202"), 
("985XFT82Y4", "20210303"), 
("985XFT82Y4", "20210309"), 
("985XFT82Y4", "20210311")] 

df1 = spark.createDataFrame(data, ["cust_id", "pt_dt"]).withColumn("pt_dt", F.to_timestamp("pt_dt", "yyyyMMdd")).withColumn("pt_dt", F.date_format(F.col('pt_dt'),"yyyy-MM-dd"))
df1.show()

data1 = [("985XFT82Y4", "2020-08-24 22:12:34.332000",  "subscribed"),   
("985XFT82Y4", "2020-11-11 14:45:31.152000",  "installed"),     
("985XFT82Y4", "2021-02-02 01:26:34.500000",  "migration"),     
("985XFT82Y4", "2021-03-09 08:11:57.790000",  "setup done")]    
ts_pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS"
df2 = spark.createDataFrame(data1, ["cust_id", "chg_date", "ins_status"]).withColumn("chg_date", F.to_timestamp("chg_date", ts_pattern)).withColumn("chg_date", F.date_format(F.col('chg_date'),"yyyy-MM-dd"))
df2.show()

window_spec = Window.partitionBy("cust_id").orderBy("chg_date")
df2 = df2.withColumn("end_chg_date", F.lead("chg_date").over(window_spec))
df2 = df2.withColumn("end_chg_date", F.when(F.col("end_chg_date").isNull(), F.lit(datetime.now().strftime("%Y-%m-%d"))).otherwise(F.col("end_chg_date")))
df2.show()

+----------+----------+----------+------------+
|   cust_id|  chg_date|ins_status|end_chg_date|
+----------+----------+----------+------------+
|985XFT82Y4|2020-08-24|subscribed|  2021-03-16|
|985XFT82Y4|2020-11-11| installed|  2021-02-02|
|985XFT82Y4|2021-02-02| migration|  2021-03-09|
|985XFT82Y4|2021-03-09|setup done|  2021-03-16|
+----------+----------+----------+------------+

cond = [df1["cust_id"] == df2["cust_id"], df1["pt_dt"] >= df2["chg_date"], df1["pt_dt"] < df2["end_chg_date"]]
df3 = df1.join(df2, cond, "left").select(df1["cust_id"], df1["pt_dt"], "ins_status").orderBy("pt_dt")
# use df1 in select to resolve same column name conflict
df3.show()


+----------+----------+----------+
|   cust_id|     pt_dt|ins_status|
+----------+----------+----------+
|985XFT82Y4|2020-08-24|subscribed|
|985XFT82Y4|2020-08-26|subscribed|
|985XFT82Y4|2020-09-02|subscribed|
|985XFT82Y4|2020-09-18|subscribed|
|985XFT82Y4|2020-09-30|subscribed|
|985XFT82Y4|2020-10-16|subscribed|
|985XFT82Y4|2020-10-21|subscribed|
|985XFT82Y4|2020-11-02|subscribed|
|985XFT82Y4|2020-11-11| installed|
|985XFT82Y4|2020-11-12| installed|
|985XFT82Y4|2020-12-08| installed|
|985XFT82Y4|2021-01-11| installed|
|985XFT82Y4|2021-02-02| migration|
|985XFT82Y4|2021-03-03| migration|
|985XFT82Y4|2021-03-09|setup done|
|985XFT82Y4|2021-03-11|setup done|
+----------+----------+----------+

【讨论】:

嗨@Mageswaran - 感谢您的回复。如果您看到从代码中获得的结果,则说明发生了交叉连接。预期输出应包含 ins_status subscribed 从 chg_dt 2020-08-24 到 chg_dt 2020-11-11 和从 2020-11-112021-02-02 它应该是 installed 和从 20210202 到 @98764@4它应该是migration 等等...... 有趣!明白了……让我试试 是否可以为每个 chg_date 设置开始和结束范围?这样我们就可以使用范围连接参考:docs.microsoft.com/en-us/azure/databricks/delta/… 你能检查一下笔记本吗?我想出了以下逻辑,这可能不适合您的业务需求: - 向前填写每个 cust_id 的日期并创建一个新的日期列,作为结束日期 - 对于新列中的空行,填写当前日期 -使用多个连接条件:id, start_date > value 【参考方案2】:

您可以在df2(使用lead)中添加一个包含下一行日期的列,以方便基于范围的连接:

from pyspark.sql import functions as F, Window

df3 = df1.withColumn('pt_date', F.to_date(df1.pt_dt.cast('string'), 'yyyyMMdd'))
df4 = df2.withColumn('next_date', F.lead('chg_date').over(Window.partitionBy('cust_id').orderBy('chg_date')))

result = df3.join(df4, 
    (df3.cust_id == df4.cust_id) & 
    (df3.pt_date >= df4.chg_date) & 
    ((df3.pt_date < df4.next_date) | df4.next_date.isNull()), 
    'left'
).select(df3.cust_id, df3.pt_dt, df4.ins_status)

result.show()
+----------+--------+----------+
|   cust_id|   pt_dt|ins_status|
+----------+--------+----------+
|985XFT82Y4|20200824|subscribed|
|985XFT82Y4|20200826|subscribed|
|985XFT82Y4|20200902|subscribed|
|985XFT82Y4|20200918|subscribed|
|985XFT82Y4|20200930|subscribed|
|985XFT82Y4|20201016|subscribed|
|985XFT82Y4|20201021|subscribed|
|985XFT82Y4|20201102|subscribed|
|985XFT82Y4|20201111| installed|
|985XFT82Y4|20201112| installed|
|985XFT82Y4|20201208| installed|
|985XFT82Y4|20210111| installed|
|985XFT82Y4|20210202| migration|
|985XFT82Y4|20210303| migration|
|985XFT82Y4|20210309|setup done|
|985XFT82Y4|20210311|setup done|
+----------+--------+----------+

【讨论】:

酷!我没有想到df4.next_date.isNull() 的情况,我在这里做了额外的工作……我的错!

以上是关于为列生成值,直到下一个值到达配置单元表的主要内容,如果未能解决你的问题,请参考以下文章

为列中的每个唯一值分配值[重复]

Excel MATCH 找不到单元格值但可以找到固定值

VBA用上面的单元格替换值直到空白并循环

求和列直到值然后复制行

将行值转换为列,并计算所有可能值 mysql 的重复次数

在配置单元中创建表时向列添加默认值