Spark UDF 没有正确给出滚动计数
Posted
技术标签:
【中文标题】Spark UDF 没有正确给出滚动计数【英文标题】:Spark UDF not giving rolling counts properly 【发布时间】:2020-10-21 07:27:10 【问题描述】:我有一个 Spark UDF 来计算列的滚动计数,精确到时间。如果我需要计算 24 小时的滚动计数,例如对于时间为 2020-10-02 09:04:00 的条目,我需要回顾到 2020-10-01 09:04:00(非常精确)。
如果我在本地运行,滚动计数 UDF 可以正常工作并给出正确的计数,但是当我在集群上运行时,它给出的结果不正确。这是示例输入和输出
输入
+---------+-----------------------+
|OrderName|Time |
+---------+-----------------------+
|a |2020-07-11 23:58:45.538|
|a |2020-07-12 00:00:07.307|
|a |2020-07-12 00:01:08.817|
|a |2020-07-12 00:02:15.675|
|a |2020-07-12 00:05:48.277|
+---------+-----------------------+
预期输出
+---------+-----------------------+-----+
|OrderName|Time |Count|
+---------+-----------------------+-----+
|a |2020-07-11 23:58:45.538|1 |
|a |2020-07-12 00:00:07.307|2 |
|a |2020-07-12 00:01:08.817|3 |
|a |2020-07-12 00:02:15.675|1 |
|a |2020-07-12 00:05:48.277|1 |
+---------+-----------------------+-----+
最后两个条目值在本地是 4 和 5,但在集群上它们是不正确的。我最好的猜测是数据正在跨执行器分布,并且 udf 也在每个执行器上并行调用。由于 UDF 的参数之一是列(本示例中的分区键 - OrderName),如果是这种情况,我如何控制/纠正集群的行为。以便它以正确的方式计算每个分区的正确计数。有什么好的建议
【问题讨论】:
你能显示你的UDF代码吗? 我不能完全共享它,它类似于 udf (ordername: Partition, time: Range, Long) process:,UDF 的初始要求,同一分区内的所有记录都已排序按日期。它的作用是针对每个分区(此处为订单名称),如果新记录用于现有分区,则将记录添加到队列中,增加计数,然后检查当前时间是否为当前时间,队列中到目前为止的所有记录是否在 24 小时内, 如果不是,则从开头删除记录(因为它是队列) 如果你能显示这将非常有用:input data/dataframe
和expected output/expected dataframe
。
我用输入和预期的输出数据框更新了问题
【参考方案1】:
根据您的评论,您要计算过去 24 小时内每条记录的记录总数
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.LongType
//A sample data (Guessing from your question)
val df = Seq(("a","2020-07-10 23:58:45.438","1"),("a","2020-07-11 23:58:45.538","1"),("a","2020-07-11 23:58:45.638","1")).toDF("OrderName","Time","Count")
// Extract the UNIX TIMESTAMP for your time column
val df2 = df.withColumn("unix_time",concat(unix_timestamp($"Time"),split($"Time","\\.")(1)).cast(LongType))
val noOfMilisecondsDay : Long = 24*60*60*1000
//Create a window per `OrderName` and select rows from `current time - 24 hours` to `current time`
val winSpec = Window.partitionBy("OrderName").orderBy("unix_time").rangeBetween(Window.currentRow - noOfMilisecondsDay, Window.currentRow)
// Final you perform your COUNT or SUM(COUNT) as per your need
val finalDf = df2.withColumn("tot_count", count("OrderName").over(winSpec))
//or val finalDf = df2.withColumn("tot_count", sum("Count").over(winSpec))
【讨论】:
是的,输入和输出都是数据帧,上面的方法很久以前就尝试过了,它不适用于单个分区的大量数据,比如 100 万条记录。火花作业永远不会完成,这就是使用 UDF 的原因,通过维护队列来减少记录的数量,但这可能导致错误的计数,因为数据正在分发。一种强制所有分区数据记录依次处理(顺序执行)的方法可以解决问题,但不确定如何 明白你的意思。是歪斜的问题。请查看***.com/a/64349579/7094520。这个想法是让OrderName
的窗口只有 24 小时
知道了,但我有 UDF,试图减少数据偏斜,但问题是,分区数据在不同的执行器之间分布,导致计数不正确
你不认为 UDF 会解决我之前解释的队列方法的问题。可能我可能不得不尝试使用mapPartitions进行迭代器转换,不知道该怎么做,需要探索。有什么建议么?您共享的链接是完全相同的问题,可能是我认识的发布它的同一个人。感谢您分享链接
当这个问题出现时,我正在与该人进行讨论,这就是为什么我直接向您推荐了可行的解决方案:)。试试吧,它肯定会减少偏度。以上是关于Spark UDF 没有正确给出滚动计数的主要内容,如果未能解决你的问题,请参考以下文章
Py(Spark) udf 给出 PythonException: 'TypeError: 'float' object is not subscriptable
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java