如何在时间窗口上应用一个非聚合函数
Posted
技术标签:
【中文标题】如何在时间窗口上应用一个非聚合函数【英文标题】:How to apply one NOT aggregating function on a time window 【发布时间】:2019-11-28 10:38:51 【问题描述】:我想用一个窗口函数,但我不想做一个聚合函数,可以吗?
这是我的问题。
我有一个电影评论列表,我想知道,对于每条评论,在过去 10 分钟内,离他最近的评论是什么。
例如:
这是我的评论表:
import java.sql.Timestamp
var commentaires = sc.parallelize(Array((1, Timestamp.valueOf("2017-03-12 03:19:50"), "super film"), (2, Timestamp.valueOf("2017-03-12 03:19:58"), "très bon film"),(3, Timestamp.valueOf("2017-03-12 03:21:50"), "tres mauvais"), (4, Timestamp.valueOf("2017-03-12 03:21:51"), "incroyable ce film"), (5, Timestamp.valueOf("2017-03-12 03:21:58"), "incroyable ce film"))).toDF("num", "CommentDate", "comment")
我拿到桌子了:
+---+-------------------+------------------+
|num|CommentDate |comment |
+---+-------------------+------------------+
|1 |2017-03-12 03:19:50|super film |
|2 |2017-03-12 03:19:58|très bon film |
|3 |2017-03-12 03:21:50|tres mauvais |
|4 |2017-03-12 03:21:51|incroyable ce film|
|5 |2017-03-12 03:21:58|incroyable ce film|
+---+-------------------+------------------+
我想要这张桌子:
+---+-------------------+------------------+------------+
|num|Commentdate |comment |distance_min|
+---+-------------------+------------------+------------+
|1 |2017-03-12 03:19:50|super film |null |
|2 |2017-03-12 03:19:58|très bon film |8 |
|3 |2017-03-12 03:21:50|tres mauvais |null |
|4 |2017-03-12 03:21:51|incroyable ce film|16 |
|5 |2017-03-12 03:21:58|incroyable ce film|0 |
+---+-------------------+------------------+------------+
对于第一条评论,过去 10 分钟内没有任何内容 --> 他
获得空值
第二条评论只有一条评论,距离为8
对于第三条评论,过去 10 分钟内没有任何内容 --> 他
获得空值
第四条评论只有一条评论,距离为16
对于第五条评论,过去 10 分钟有 2 个 cmets,距离为 0 和 16,因此最小距离为 0
有关信息,我使用此距离计算:
// Distance de Levenshtein
import scala.collection.mutable
import scala.collection.parallel.ParSeq
import org.apache.spark.sql.functions.udf
def levenshtein(s1: String, s2: String): Int =
val memorizedCosts = mutable.Map[(Int, Int), Int]()
def lev: ((Int, Int)) => Int =
case (k1, k2) =>
memorizedCosts.getOrElseUpdate((k1, k2), (k1, k2) match
case (i, 0) => i
case (0, j) => j
case (i, j) =>
ParSeq(1 + lev((i - 1, j)),
1 + lev((i, j - 1)),
lev((i - 1, j - 1))
+ (if (s1(i - 1) != s2(j - 1)) 1 else 0)).min
)
lev((s1.length, s2.length))
levenshtein("a B c", "a b c")
我尝试查看窗口函数,但我能做的就是聚合函数。如何对每一行应用窗口函数(这里计算 2 个语音之间的距离)并仅在最后进行聚合(获取最小值)?
如果不可能。有谁知道我该怎么做?
【问题讨论】:
【参考方案1】:这是一个解决方案:
Step1:使用“collect_list”获取时间窗口内的所有评论。
Step2:使用用户定义函数将评论与评论列表进行比较
/////////////////////////////////////// //////////////////////////
步骤0:
评论员的定义:
import java.sql.Timestamp
var commentaires = sc.parallelize(Array((1, Timestamp.valueOf("2017-03-12 03:05:50"), "super film"), (2, Timestamp.valueOf("2017-03-12 03:19:58"), "très bon film"),(3, Timestamp.valueOf("2017-03-12 04:21:50"), "tres mauvais"), (4, Timestamp.valueOf("2017-03-12 03:21:51"), "incroyable ce film"), (5, Timestamp.valueOf("2017-03-12 03:21:58"), "incroyable ce film"), (6, Timestamp.valueOf("2017-03-12 03:22:58"), "incroyable ce film"))).toDF("num", "dateCommentaire", "commentaire")
第一步:使用collect_list
import org.apache.spark.sql.functions.col,concat_ws, collect_list
import org.apache.spark.sql.expressions.Window
var w1 = Window.orderBy(col("dateCommentaire").cast("long")).rangeBetween(-10*60,-1)
commentaires = commentaires.withColumn("listePreviousCom", collect_list("commentaire") over(w1))
第二步:计算最小距离:
var getMinDistance: ((String, mutable.WrappedArray[String]) => (Option[Double]))= (comment: String, listComment: mutable.WrappedArray[String]) =>
// initialisation de la liste (with "-1.0" to have a list of Double)
var listDistance = List(-1.0)
listDistance = listDistance.filter(_ != -1.0) // Delete "-1.0"
for ((commentaire, indexe) <- listComment.zipWithIndex)
listDistance = lev(commentaire, comment) :: listDistance
listDistance = listDistance.reverse
if (listDistance.size >= 1)
var minDistance = listDistance.min
Some(minDistance)
else
None
val getMinDistance_f= udf(getMinDistance)
commentaires = commentaires.withColumn("minDistance", getMinDistance_f($"commentaire", $"listePreviousCom"))
【讨论】:
以上是关于如何在时间窗口上应用一个非聚合函数的主要内容,如果未能解决你的问题,请参考以下文章
如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?