Spark Window 函数 - 获取每行分区中的所有记录,并保持顺序

Posted

技术标签:

【中文标题】Spark Window 函数 - 获取每行分区中的所有记录,并保持顺序【英文标题】:Spark Window function - Get all records in a partition in each row, with order maintained 【发布时间】:2020-12-03 19:03:52 【问题描述】:

我的输入表('messages'):

sessionID  timestamp  sender   message
S1         1          BUYER    Hi
S1         2          AGENT    Tell me Sir
S1         3          BUYER    Can you help me?
S1         4          BUYER    Sure.
S2         3          AGENT    Hello Sir
S2         2          BUYER    Hello.             

我想要的输出:

sessionID  timestamp  sender   message             conversation     
S1         1          BUYER    Hi                  [[BUYER : Hi], [AGENT : Tell me Sir], [BUYER : Can you help me?], [BUYER : Sure.]]
S1         2          AGENT    Tell me Sir         [[BUYER : Hi], [AGENT : Tell me Sir], [BUYER : Can you help me?], [BUYER : Sure.]]
S1         3          BUYER    Can you help me?    [[BUYER : Hi], [AGENT : Tell me Sir], [BUYER : Can you help me?], [BUYER : Sure.]]
S1         4          BUYER    Sure.               [[BUYER : Hi], [AGENT : Tell me Sir], [BUYER : Can you help me?], [BUYER : Sure.]]
S2         3          AGENT    Hello Sir           [[BUYER : Hello], [AGENT : Hello Sir]]
S2         2          BUYER    Hello               [[BUYER : Hello], [AGENT : Hello Sir]]

我的查询:

spark.sql("select *, collect_list( concat(sender, ' : ', message) ) over (partition by sessionID order by timestamp asc) as conversation from messages")

当前输出:

sessionID  timestamp  sender   message             conversation     
S1         1          BUYER    Hi                  [[BUYER : Hi]]
S1         2          AGENT    Tell me Sir         [[BUYER : Hi], [AGENT : Tell me Sir]]
S1         3          BUYER    Can you help me?    [[BUYER : Hi], [AGENT : Tell me Sir], [BUYER : Can you help me?]]
S1         4          BUYER    Sure.               [[BUYER : Hi], [AGENT : Tell me Sir], [BUYER : Can you help me?], [BUYER : Sure.]]
S2         3          AGENT    Hello Sir           [[BUYER : Hello]]
S2         2          BUYER    Hello               [[BUYER : Hello], [AGENT : Hello Sir]]

从当前和期望的输出中可以看出,我希望在收集列表中维护消息的顺序,并且我还需要包含每个分区中的所有记录(不仅仅是当前记录之前的行) 有人可以帮我获得所需的输出吗?

最好的问候。

【问题讨论】:

【参考方案1】:

为窗口添加一个规范,如下面的大写字母所示。这将确保分区中的所有行都包括在内。

spark.sql("""
select *,
  collect_list( concat(sender, ' : ', message) ) over (
    partition by sessionID
    order by timestamp asc
    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
  ) as conversation from messages
""")

【讨论】:

以上是关于Spark Window 函数 - 获取每行分区中的所有记录,并保持顺序的主要内容,如果未能解决你的问题,请参考以下文章

Spark Window 函数:是不是可以直接从使用第一个/最后一个函数找到的行中获取其他值?

spark 函数

具有嵌套列的 Apache Spark Window 函数

在 Spark Dataframe 中实现 Window 的重叠分区

Spark Dataset - 每行的“编辑”镶木地板文件

从Spark limit()函数重新分区数据帧