databricks、spark、scala,不能长时间使用 lag()

Posted

技术标签:

【中文标题】databricks、spark、scala,不能长时间使用 lag()【英文标题】:databricks, spark, scala, cannot use lag() on long 【发布时间】:2021-03-21 19:04:56 【问题描述】:

我有一个名为 q6 的数据文件,如下所示:

date,count
2019-01-07,9553
2019-01-08,9930
2019-01-28,10160
2019-01-30,9881
2019-01-26,10867
2019-02-01,8
2019-01-20,6823
2019-01-22,9796
2019-01-19,9295
2019-01-05,9432
2019-01-03,10063
2018-12-31,13
2019-01-31,9804
2019-01-10,11051
2019-01-17,11268
2019-01-04,10451

我想计算每个日期与前一个日期之间的差异,以及增加/减少的百分比。这是我的文件的架构:

root
 |-- date: date (nullable = true)
 |-- count: long (nullable = false)

这是我尝试过的命令(其中一些,无论如何):

q6 = q6.groupBy("date").count()
//q6 = q6.withColumn("count", $"count" as "Int")//col("count").cast("int")
//q6 = q6.sort("date")
//q6.printSchema()
val windowSpec  = Window.partitionBy("date").orderBy("date")
q6 = q6.withColumn("lag", lag("count",1).over(windowSpec))
//q6 = q6.withColumn("prev_value", lag(q6.count).over(windowSpec))
//q6 = q6.withColumn("diff", when(isnull(q6.count - q6.prev_value), 0).otherwise(q6.price - q6.prev_value))
display(q6)

这运行没有错误,但我得到空值,如下所示:

date,count,lag
2019-01-07,9553,null
2019-01-08,9930,null
2019-01-28,10160,null
2019-01-30,9881,null
2019-01-26,10867,null
2019-02-01,8,null
2019-01-20,6823,null
2019-01-22,9796,null
2019-01-19,9295,null
2019-01-05,9432,null
2019-01-03,10063,null
2018-12-31,13,null

我使用 SQL Server 和窗口函数,虽然我对它们不是特别精通,但我可以让它们正常工作而不会出现太多问题。我将数据集放入 SQL Server 中,一切正常!这里有什么问题?

【问题讨论】:

【参考方案1】:

在 SQL 中你可以这样写

lag(count) over (order by date)

所以在 Scala Spark 中,你会写

val windowSpec = Window.orderBy("date")
q6 = q6.withColumn("lag", lag("count", 1).over(windowSpec))

如果您按日期分区,由于每个日期只有 1 个关联行,lag 将导致 null。无需按日期分区。

【讨论】:

【参考方案2】:

我的问题是我对窗口规范进行了分区。这是我应该做的:

val windowSpec  = Window.partitionBy().orderBy("date")

注意 partitionBy() 函数不带参数。现在完美运行。

顺便说一句,我正在为此苦苦挣扎,但能够在 SQL Server 中实现解决方案非常有帮助。我们的问题是我们的数据库已经超出了我们的规模,需要对大量数据进行分析。我花了几年时间学习 SQL Server。我猜我要花几年时间来学习 Spark --- 但绝对值得!

【讨论】:

请注意,spark 会将每个分区洗牌到单个执行程序上(因此,如果您的数据变得庞大,这可能是个问题)

以上是关于databricks、spark、scala,不能长时间使用 lag()的主要内容,如果未能解决你的问题,请参考以下文章

使用 databricks 在 Spark(scala) 中生成具有属性和值的 XML

由于 Databricks 不公开支持 spark-redshift lib,使用 Scala spark 从 Redshift 读取/写入 Redshift 的最佳方法是啥

Azure Datalake Store Gen2 使用 scala spark 库从 Databricks 读取文件

如何在 Azure 数据工厂的 Databricks 上运行 .Net spark 作业?

使用 6.4 版扩展支持(包括 Apache Spark 2.4.5、Scala 2.11)在 azure databricks 上启动集群时出现问题

Spark零基础入门:Scala 类和对象(下)