Flink SQL ---Top-N ,Window Top-N

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL ---Top-N ,Window Top-N相关的知识,希望对你有一定的参考价值。

文章目录

一、Top-N

Top-N 查询要求按列排序的最小或最大的 N 个值。最小值集和最大值集都被认为是 Top-N 查询。如果需要在某个条件上仅显示批处理/流表中最底部的N条记录或最顶部的 N 条记录,则 Top-N 查询非常有用。这个结果集可以用于进一步的分析。

Flink 使用OVER窗口子句和筛选条件的组合来表示 Top-N 查询。通过OVER窗口 PARTITION BY 子句的功能,Flink 还支持每组 Top-N。例如,每个类别中实时销售额最高的前五种产品。批处理表和流表上的 SQL 支持 Top-N 查询。

Top-N 语句结构

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

Note: the above pattern must be followed exactly, otherwise the optimizer won’t be able to translate the query.

TopN查询为**“结果更新”**。Flink SQL将根据顺序键对输入数据流进行排序,因此,如果前N个记录发生了更改,则更改的记录将作为 撤回/更新记录发送到下游建议使用支持更新的存储作为Top-N查询的接收器。此外,如果前N个记录需要存储在外部存储中,结果表应该具有与top -N查询相同的唯一键。

The TopN query is Result Updating. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream. It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query.

案例
源数据

create view pv_log as 
select category_id, item_id, count(1) as pv from user_log group by category_id, item_id;
  
SELECT *
FROM (
   SELECT *,
     ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
   FROM pv_log)
WHERE rownum <= 3;

1.1、No Ranking Output Optimization

如上所述,rownum字段将作为唯一键的一个字段写入结果表,这可能会导致大量记录写入结果表。例如,当排名9的记录(比如product-1001)被更新,它的排名被升级为1时,从排名1到9的所有记录都将作为更新消息输出到结果表中。如果结果表接收到太多的数据,它将成为SQL作业的瓶颈。

优化方法是在Top-N查询的外部SELECT子句中省略rownum字段。这是合理的,因为前N记录的数量通常不多,因此消费者可以自己快速地对记录进行排序。如果没有rownum字段,在上面的例子中,只需要将更改的记录(product-1001)发送到下游,这可以减少对结果表的大量IO。

-- 从输出中省略row_num字段
SELECT category_id, pv
FROM (
   SELECT *,
     ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
   FROM pv_log)
WHERE rownum <= 3;

二、Window Top-N

特殊的Top-N,基于窗口
格式:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

案例

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY pv DESC) as rownum
    FROM (
      SELECT window_start, window_end, category_id, count(1) as pv, count(distinct user_id) as uv
      FROM TABLE(
        TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECONDS))
      GROUP BY window_start, window_end, category_id
    )
  ) WHERE rownum <= 3;
  

2.1、限制

Currently, Flink only supports Window Top-N which follows after Window Aggregation. Window Top-N after Windowing TVF will be support in the near future.

以上是关于Flink SQL ---Top-N ,Window Top-N的主要内容,如果未能解决你的问题,请参考以下文章

Flink窗口Window机制详解

Flink WaterMark原理与实现

top-N 抽样

sql 为所有表创建drop语句,并从表空间转储视图。只需运行此代码,然后将输出复制并粘贴到sql windo

评估和计算 Top-N 准确度:Top 1 和 Top 5

flink 读取mysql并使用flink sql