从 Spark sql Windows 函数获得意外结果

Posted

技术标签:

【中文标题】从 Spark sql Windows 函数获得意外结果【英文标题】:Getting unexpected results from Spark sql Windows Functions 【发布时间】:2017-09-30 13:48:45 【问题描述】:

Spark sql Window 功能似乎无法正常工作。 我在 Hadoop 集群中运行一个 Spark 作业,其中 HDFS 块大小为 128 MB,并且 Spark 版本 1.5 CDH 5.5

我的要求:

如果有多个记录具有相同的data_rfe_id,则根据最大seq_id和最大service_id取单个记录

我看到在原始数据中有一些记录具有相同的 data_rfe_id 和相同的 seq_id 因此,我使用 Window 函数应用 row_number 以便我可以过滤具有 row_num === 1 的记录

但是当有大量数据集时它似乎不起作用。我看到应用了相同的 rowNumber 。

为什么会这样?

在数据框上应用窗口函数之前是否需要重新洗牌?

我希望每个 data_rfe_id 都有一个唯一的排名号

我只想使用窗口函数来实现这一点。

 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.rowNumber
 .....

scala> df.printSchema
root
 |-- transitional_key: string (nullable = true)
 |-- seq_id: string (nullable = true)
 |-- data_rfe_id: string (nullable = true)
 |-- service_id: string (nullable = true)
 |-- event_start_date_time: string (nullable = true)
 |-- event_id: string (nullable = true)


 val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc,df("service_id").desc)
  val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction))
  rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false)

预期结果:

 +------------------------------------+-----------------+-----------+-------+
  |data_rfe_id                         |seq_id           |service_id|row_num|
 +------------------------------------+-----------------+-----------+-------+
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826          |4039       |1     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821          |3356       |2     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802          |1857       |3     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2156       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2103       |2     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2083       |3     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2082       |4     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2076       |5     |

我根据上面的代码得到的实际结果:

 +------------------------------------+-----------------+-----------+-------+
 |data_rfe_id                          |seq_id           |service_id|row_num|
 +------------------------------------+-----------------+-----------+-------+
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826          |4039       |1     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821          |3356       |1     |
 |9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802          |1857       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2156       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2103       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2083       |1     |
 |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2082       |1     |
  |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541          |2076       |1     |

谁能解释我为什么会得到这些意想不到的结果?以及如何解决?

【问题讨论】:

【参考方案1】:

基本上你想要 rank , seq_id 和 service_id 按 desc 顺序排列。添加 rangeBetween 与您需要的范围。排名可能对您有用。以下是sn-p的代码:

val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id"),df("service_id")).desc().rangeBetween(-MAXNUMBER,MAXNUMBER))
val rankDF =df.withColumn( "rank", rank().over(windowFunction) )

由于您使用的是旧版本的 spark 不知道它是否可以工作。 windowSpec 有问题,这里是reference

【讨论】:

当我使用 HiveContext 时,我的代码可以工作,但它会为某些 task_id 单独抛出另一个异常。异常是由以下原因引起的:java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String 无法在 org.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) 处转换​​为 java.lang.Integer。 apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)

以上是关于从 Spark sql Windows 函数获得意外结果的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL Windows:基于数组列创建数据框

如何从 spark sql 调用具有数据帧操作的函数?

从 org.apache.spark.sql.types.StructType 生成 AvroSchema

在Spark SQL作业中使用地理空间函数

如何在spark sql lag函数中添加if或case条件

使用Spark SQL中的regex函数从字符串中提取特定数字