火花窗函数缺失值

Posted

技术标签:

【中文标题】火花窗函数缺失值【英文标题】:spark window function missing values 【发布时间】:2020-10-07 11:30:23 【问题描述】:

我有一个数据框

+------+---------------+--------------+-------------------+
|devId |servertimestamp|trackingnumber|   servertimestamp2|
+------+---------------+--------------+-------------------+
|  8010|     1602022571|  480027192318|2020-10-06 22:16:11|
|  8010|     1602022572|  116035246092|2020-10-06 22:16:12|
|  8010|     1602022573|  495863861847|2020-10-06 22:16:13|
|  8010|     1602022575|  485108185153|2020-10-06 22:16:15|
|  8010|     1602022576|  787294899718|2020-10-06 22:16:16|
|  8010|     1602022577|  118929636841|2020-10-06 22:16:17|
|  8010|     1602022579|  119867330791|2020-10-06 22:16:19|
|  8010|     1602022580|  118929640260|2020-10-06 22:16:20|
|  8010|     1602022581|  114194932911|2020-10-06 22:16:21|
|  8010|     1602022583|  104499502413|2020-10-06 22:16:23|
|  8010|     1602022584|  104499503350|2020-10-06 22:16:24|
|  8010|     1602022585|  789385310169|2020-10-06 22:16:25|
|  8010|     1602022587|  789385066288|2020-10-06 22:16:27|
|  8010|     1602022588|  113194381766|2020-10-06 22:16:28|
|  8010|     1602022589|  119846967190|2020-10-06 22:16:29|
|  8010|     1602022591|  114478769341|2020-10-06 22:16:31|
|  8010|     1602022593|  114478769352|2020-10-06 22:16:33|
|  8010|     1602022594|  776077921980|2020-10-06 22:16:34|
|  8010|     1602022596|  116088883660|2020-10-06 22:16:36|
|  8010|     1602022597|  414142833630|2020-10-06 22:16:37|
+------+---------------+--------------+-------------------+

我想每 5 分钟获取每个 devId 的记录数。所以我愿意

  val myDF2 = myDF.groupBy(col("devId"), window(col("servertimestamp2"), "5 minutes", "5 minutes")).count()

测试结果:

myDF2.select("*").where("devId = 3121").orderBy("window").show(false)

我得到的结果有差距。例如,时间窗口 17:35:00 -- 17:40:00、18:00:00 -- 18:55:00 没有数据。我认为这是因为在那些时间段内没有记录。 如何让它显示所有时间窗口,即使是计数为 0 的时间窗口?

 +------+------------------------------------------+-----+
    |devId |window                                    |count|
    +------+------------------------------------------+-----+
    |3121  |[2020-10-06 17:30:00, 2020-10-06 17:35:00]|1    |
    |3121  |[2020-10-06 17:40:00, 2020-10-06 17:45:00]|1    |
    |3121  |[2020-10-06 17:45:00, 2020-10-06 17:50:00]|1    |
    |3121  |[2020-10-06 17:50:00, 2020-10-06 17:55:00]|1    |
    |3121  |[2020-10-06 17:55:00, 2020-10-06 18:00:00]|1    |
    |3121  |[2020-10-06 18:55:00, 2020-10-06 19:00:00]|1    |
    |3121  |[2020-10-06 21:10:00, 2020-10-06 21:15:00]|1    |
    |3121  |[2020-10-06 21:20:00, 2020-10-06 21:25:00]|1    |
    |3121  |[2020-10-07 00:45:00, 2020-10-07 00:50:00]|1    |
    |3121  |[2020-10-07 01:10:00, 2020-10-07 01:15:00]|1    |
    |3121  |[2020-10-07 01:15:00, 2020-10-07 01:20:00]|2    |
    |3121  |[2020-10-07 01:20:00, 2020-10-07 01:25:00]|1    |
    |3121  |[2020-10-07 01:25:00, 2020-10-07 01:30:00]|1    |
    |3121  |[2020-10-07 01:35:00, 2020-10-07 01:40:00]|1    |
    |3121  |[2020-10-07 01:50:00, 2020-10-07 01:55:00]|1    |
    |3121  |[2020-10-07 01:55:00, 2020-10-07 02:00:00]|1    |
    |3121  |[2020-10-07 02:10:00, 2020-10-07 02:15:00]|1    |
    |3121  |[2020-10-07 05:40:00, 2020-10-07 05:45:00]|1    |
    |3121  |[2020-10-07 05:45:00, 2020-10-07 05:50:00]|1    |
    |3121  |[2020-10-07 05:50:00, 2020-10-07 05:55:00]|1    |
    +------+------------------------------------------+-----+

【问题讨论】:

样本数据错误。 你能确认你在这些时期没有数据吗? 【参考方案1】:

可以生成包含所有可能的时间窗口和 devIds 组合的第二个数据帧,然后将此数据帧与myDF2 连接以填补空白。

//get the minimal and maximal timestamps of all windows
//myDF2 should be cached before this operation
val minMax = myDF2.agg("window"->"min", "window" -> "max").collect()(0)
val (min, max) = (minMax.getStruct(0).getTimestamp(0),
                  minMax.getStruct(1).getTimestamp(1))

//get the distinct devIds from the original data
val devIds = myDF.select('devId).distinct()

//create the sequence of all possible windows and cross join it with the devIds
//the cross join should not be too slow because at least the list of windows
//should always be small enough to be broadcasted
val dft = spark.sql(s"select explode(sequence(to_timestamp('$min', 'yyyy-MM-dd HH:mm:ss.S'), to_timestamp('$max', 'yyyy-MM-dd HH:mm:ss.S'), interval 5 minutes)) as Date")
    .groupBy( window(col("Date"), "5 minutes")).count().drop("count")
    .crossJoin(devIds)

//join the second dataframe to myDF2
myDF2.join(dft, Seq("devId","window"), "right")
  .orderBy("devId", "window")
  .select("devId", "window", "count").show(false)


【讨论】:

标准方式。

以上是关于火花窗函数缺失值的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 用递减填充缺失值

R语言使用isna函数查看列表和dataframe中是否包含缺失值将dataframe中数据列中的异常值标注为缺失值NA使用na.omit函数删除dataframe中包含缺失值NA的数据行

pandas使用notnull函数和astype函数将dataframe中所有缺失值映射为0,非缺失值映射为1

R语言边学边记8-缺失值处理与转换函数

R语言自定义函数计算dataframe每列中的缺失值NA的个数缺失值问题及其填充示例

缺失值处理