Hive 查询以查找中间几周的计数

Posted

技术标签:

【中文标题】Hive 查询以查找中间几周的计数【英文标题】:Hive query to find the count for the weeks in middle 【发布时间】:2019-05-10 05:14:32 【问题描述】:

我有一张像下面这样的表格

id      week    count   
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30    
A100    201013  36    
A100    201015  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

在这里,我们可以看到缺少以下几周:

缺少第一个 201014 第二个201016不见了 第三周失踪201020、201021、201022

我的要求是每当我们有缺失值时,我们需要显示前一周的计数。

在这种情况下,输出应该是:

id      week    count
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30   
A100    201013  36    
A100    201014  36    
A100    201015  43    
A100    201016  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201020  63
A100    201021  63    
A100    201022  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

如何使用 hive/pyspark 实现这一要求?

【问题讨论】:

您需要提供更多关于您想要做什么的信息。你说的是一年中的几个星期吗?为什么不是从第 0 周开始?你想达到第 52 周还是第 53 周(如 ISO-8601)?你想要每个组吗? @pault 它不应该以零开头(尽管它可以)。周列由会计周编号 .201009 表示 2010 年的第 9 个会计周 【参考方案1】:

虽然这个答案在 Scala 中,但 Python 版本看起来几乎相同并且可以轻松转换。

第 1 步:

查找之前缺少周值的行。

示例输入:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//sample input
val input = sc.parallelize(List(("A100",201008,2), ("A100",201009,9),("A100",201014,4), ("A100",201016,45))).toDF("id","week","count")

scala> input.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201014|    4| //missing 4 rows
|A100|201016|   45| //missing 1 row
+----+------+-----+ 

要找到它,我们可以在week 上使用.lead() 函数。并计算leadWeekweek 之间的差异。差值不应 > 1,如果是,则在它之前有缺失的行。

val diffDF = input
  .withColumn("leadWeek", lead($"week", 1).over(Window.partitionBy($"id").orderBy($"week")))   // partitioning by id & computing lead()
  .withColumn("diff", ($"leadWeek" - $"week") -1)                                 // finding difference between leadWeek & week

scala> diffDF.show
+----+------+-----+--------+----+
|  id|  week|count|leadWeek|diff|
+----+------+-----+--------+----+
|A100|201008|    2|  201009|   0| // diff -> 0 represents that no rows needs to be added
|A100|201009|    9|  201014|   4| // diff -> 4 represents 4 rows are to be added after this row.
|A100|201014|    4|  201016|   1| // diff -> 1 represents 1 row to be added after this row.
|A100|201016|   45|    null|null|
+----+------+-----+--------+----+

第 2 步:

如果 diff >= 1:创建并添加 n 行(InputWithDiff,检查下面的案例类),如 diff 并相应地增加 week 值。返回新的 与原始行一起创建行。 如果 diff 为 0,则不需要额外的计算。原样返回原始行。

diffDF 转换为数据集以便于计算。

case class InputWithDiff(id: Option[String], week: Option[Int], count: Option[Int], leadWeek: Option[Int], diff: Option[Int])

val diffDS = diffDF.as[InputWithDiff]

val output = diffDS.flatMap(x => 
 val diff = x.diff.getOrElse(0) 

 diff match 
  case n if n >= 1 => x :: (1 to diff).map(y => InputWithDiff(x.id, Some(x.week.get + y), x.count,x.leadWeek, x.diff)).toList  // create and append new Rows
  case _ => List(x)      // return as it is
 
).drop("leadWeek", "diff").toDF   // drop unnecessary columns & convert to DF

最终输出:

scala> output.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201010|    9|
|A100|201011|    9|
|A100|201012|    9|
|A100|201013|    9|
|A100|201014|    4|
|A100|201015|    4|
|A100|201016|   45|
+----+------+-----+

【讨论】:

【参考方案2】:

PySpark 解决方案

样本数据

df = spark.createDataFrame([(1,201901,10),
                            (1,201903,9),
                            (1,201904,21),
                            (1,201906,42),
                            (1,201909,3),
                            (1,201912,56)
                           ],['id','weeknum','val'])
df.show()
+---+-------+---+
| id|weeknum|val|
+---+-------+---+
|  1| 201901| 10|
|  1| 201903|  9|
|  1| 201904| 21|
|  1| 201906| 42|
|  1| 201909|  3|
|  1| 201912| 56|
+---+-------+---+

1) 基本思想是使用cross join 创建所有 id 和周的组合(从可能的最小值到最大值)。

from pyspark.sql.functions import min,max,sum,when
from pyspark.sql import Window
min_max_week = df.agg(min(df.weeknum),max(df.weeknum)).collect()
#Generate all weeks using range
all_weeks = spark.range(min_max_week[0][0],min_max_week[0][1]+1)
all_weeks = all_weeks.withColumnRenamed('id','weekno')
#all_weeks.show()
id_all_weeks = df.select(df.id).distinct().crossJoin(all_weeks).withColumnRenamed('id','aid')
#id_all_weeks.show()

2) 此后,left join将原始数据框添加到这些组合上有助于识别缺失值。

res = id_all_weeks.join(df,(df.id == id_all_weeks.aid) & (df.weeknum == id_all_weeks.weekno),'left')
res.show()
+---+------+----+-------+----+
|aid|weekno|  id|weeknum| val|
+---+------+----+-------+----+
|  1|201911|null|   null|null|
|  1|201905|null|   null|null|
|  1|201903|   1| 201903|   9|
|  1|201904|   1| 201904|  21|
|  1|201901|   1| 201901|  10|
|  1|201906|   1| 201906|  42|
|  1|201908|null|   null|null|
|  1|201910|null|   null|null|
|  1|201912|   1| 201912|  56|
|  1|201907|null|   null|null|
|  1|201902|null|   null|null|
|  1|201909|   1| 201909|   3|
+---+------+----+-------+----+

3) 然后,使用窗口函数的组合,sum -> 分配组 和max -> 在组分类后填写缺失值。

w1 = Window.partitionBy(res.aid).orderBy(res.weekno)
groups = res.withColumn("grp",sum(when(res.id.isNull(),0).otherwise(1)).over(w1))
w2 = Window.partitionBy(groups.aid,groups.grp)
missing_values_filled = groups.withColumn('filled',max(groups.val).over(w2)) #select required columns as needed
missing_values_filled.show() 

+---+------+----+-------+----+---+------+
|aid|weekno|  id|weeknum| val|grp|filled|
+---+------+----+-------+----+---+------+
|  1|201901|   1| 201901|  10|  1|    10|
|  1|201902|null|   null|null|  1|    10|
|  1|201903|   1| 201903|   9|  2|     9|
|  1|201904|   1| 201904|  21|  3|    21|
|  1|201905|null|   null|null|  3|    21|
|  1|201906|   1| 201906|  42|  4|    42|
|  1|201907|null|   null|null|  4|    42|
|  1|201908|null|   null|null|  4|    42|
|  1|201909|   1| 201909|   3|  5|     3|
|  1|201910|null|   null|null|  5|     3|
|  1|201911|null|   null|null|  5|     3|
|  1|201912|   1| 201912|  56|  6|    56|
+---+------+----+-------+----+---+------+

Hive Query 与上述相同的逻辑(假设可以创建一个包含所有周的表)

select id,weeknum,max(val) over(partition by id,grp) as val
from (select i.id
            ,w.weeknum
            ,t.val
            ,sum(case when t.id is null then 0 else 1 end) over(partition by i.id order by w.weeknum) as grp 
      from (select distinct id from tbl) i
      cross join weeks_table w
      left join tbl t on t.id = i.id and w.weeknum = t.weeknum
     ) t

【讨论】:

以上是关于Hive 查询以查找中间几周的计数的主要内容,如果未能解决你的问题,请参考以下文章

查询以查找累积同时减去其他计数

SQL查询以查找表中列值多次出现的计数?

Athena (Hive/Presto) Parquet vs ORC 计数查询

Impala 查询以获取计数聚合函数中使用的列的样本值

如何使用 group by(基于一列)从表中选择多列,在 hive 查询中具有和计数

蜂巢计数查询无法完成它永远运行