如何在时间窗口上应用一个非聚合函数

Posted

技术标签:

【中文标题】如何在时间窗口上应用一个非聚合函数【英文标题】:How to apply one NOT aggregating function on a time window 【发布时间】:2019-11-28 10:38:51 【问题描述】:

我想用一个窗口函数,但我不想做一个聚合函数,可以吗?

这是我的问题。

我有一个电影评论列表,我想知道,对于每条评论,在过去 10 分钟内,离他最近的评论是什么。

例如:

这是我的评论表:

import java.sql.Timestamp

var commentaires = sc.parallelize(Array((1, Timestamp.valueOf("2017-03-12 03:19:50"), "super film"), (2, Timestamp.valueOf("2017-03-12 03:19:58"), "très bon film"),(3, Timestamp.valueOf("2017-03-12 03:21:50"), "tres mauvais"), (4, Timestamp.valueOf("2017-03-12 03:21:51"), "incroyable ce film"), (5, Timestamp.valueOf("2017-03-12 03:21:58"), "incroyable ce film"))).toDF("num", "CommentDate", "comment")

我拿到桌子了:

+---+-------------------+------------------+
|num|CommentDate        |comment           |
+---+-------------------+------------------+
|1  |2017-03-12 03:19:50|super film        |
|2  |2017-03-12 03:19:58|très bon film     |
|3  |2017-03-12 03:21:50|tres mauvais      |
|4  |2017-03-12 03:21:51|incroyable ce film|
|5  |2017-03-12 03:21:58|incroyable ce film|
+---+-------------------+------------------+

我想要这张桌子:

+---+-------------------+------------------+------------+
|num|Commentdate        |comment           |distance_min|
+---+-------------------+------------------+------------+
|1  |2017-03-12 03:19:50|super film        |null        |
|2  |2017-03-12 03:19:58|très bon film     |8           |
|3  |2017-03-12 03:21:50|tres mauvais      |null        |
|4  |2017-03-12 03:21:51|incroyable ce film|16          |
|5  |2017-03-12 03:21:58|incroyable ce film|0           |
+---+-------------------+------------------+------------+
对于第一条评论,过去 10 分钟内没有任何内容 --> 他 获得空值 第二条评论只有一条评论,距离为8 对于第三条评论,过去 10 分钟内没有任何内容 --> 他 获得空值 第四条评论只有一条评论,距离为16 对于第五条评论,过去 10 分钟有 2 个 cmets,距离为 0 和 16,因此最小距离为 0

有关信息,我使用此距离计算:

// Distance de Levenshtein
import scala.collection.mutable
import scala.collection.parallel.ParSeq
import org.apache.spark.sql.functions.udf

def levenshtein(s1: String, s2: String): Int = 
  val memorizedCosts = mutable.Map[(Int, Int), Int]()

  def lev: ((Int, Int)) => Int = 
    case (k1, k2) =>
      memorizedCosts.getOrElseUpdate((k1, k2), (k1, k2) match 
        case (i, 0) => i
        case (0, j) => j
        case (i, j) =>
          ParSeq(1 + lev((i - 1, j)),
            1 + lev((i, j - 1)),
            lev((i - 1, j - 1))
              + (if (s1(i - 1) != s2(j - 1)) 1 else 0)).min
      )
  

  lev((s1.length, s2.length))


levenshtein("a B c", "a b c")

我尝试查看窗口函数,但我能做的就是聚合函数。如何对每一行应用窗口函数(这里计算 2 个语音之间的距离)并仅在最后进行聚合(获取最小值)?

如果不可能。有谁知道我该怎么做?

【问题讨论】:

【参考方案1】:

这是一个解决方案:

Step1:使用“collect_list”获取时间窗口内的所有评论。

Step2:使用用户定义函数将评论与评论列表进行比较

/////////////////////////////////////// //////////////////////////

步骤0:

评论员的定义:

import java.sql.Timestamp
var commentaires = sc.parallelize(Array((1, Timestamp.valueOf("2017-03-12 03:05:50"), "super film"), (2, Timestamp.valueOf("2017-03-12 03:19:58"), "très bon film"),(3, Timestamp.valueOf("2017-03-12 04:21:50"), "tres mauvais"), (4, Timestamp.valueOf("2017-03-12 03:21:51"), "incroyable ce film"), (5, Timestamp.valueOf("2017-03-12 03:21:58"), "incroyable ce film"), (6, Timestamp.valueOf("2017-03-12 03:22:58"), "incroyable ce film"))).toDF("num", "dateCommentaire", "commentaire") 

第一步:使用collect_list

import org.apache.spark.sql.functions.col,concat_ws, collect_list
import org.apache.spark.sql.expressions.Window
var w1 = Window.orderBy(col("dateCommentaire").cast("long")).rangeBetween(-10*60,-1)
commentaires = commentaires.withColumn("listePreviousCom", collect_list("commentaire") over(w1))

第二步:计算最小距离:

var getMinDistance: ((String, mutable.WrappedArray[String]) => (Option[Double]))= (comment: String, listComment: mutable.WrappedArray[String]) => 
    // initialisation de la liste (with "-1.0" to have a list of Double)
    var listDistance = List(-1.0)
    listDistance = listDistance.filter(_ != -1.0) // Delete "-1.0"
    for ((commentaire, indexe) <- listComment.zipWithIndex) 
        listDistance = lev(commentaire, comment) :: listDistance
    
    listDistance = listDistance.reverse
    if (listDistance.size >= 1)
        var minDistance = listDistance.min
        Some(minDistance)
    
     else 
         None
    


val getMinDistance_f= udf(getMinDistance)

commentaires = commentaires.withColumn("minDistance", getMinDistance_f($"commentaire", $"listePreviousCom"))

【讨论】:

以上是关于如何在时间窗口上应用一个非聚合函数的主要内容,如果未能解决你的问题,请参考以下文章

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

MySQL窗口函数_聚合函数

Spark Window 聚合与 Group By/Join 性能

SQL滑动窗口聚合(不使用窗口函数)

在子查询中使用聚合和窗口函数

如何在scala中获取分层数组的最终元素并在其上应用聚合函数?