从 Spark sql Windows 函数获得意外结果
Posted
技术标签:
【中文标题】从 Spark sql Windows 函数获得意外结果【英文标题】:Getting unexpected results from Spark sql Windows Functions 【发布时间】:2017-09-30 13:48:45 【问题描述】:Spark sql Window 功能似乎无法正常工作。 我在 Hadoop 集群中运行一个 Spark 作业,其中 HDFS 块大小为 128 MB,并且 Spark 版本 1.5 CDH 5.5
我的要求:
如果有多个记录具有相同的data_rfe_id,则根据最大seq_id和最大service_id取单个记录
我看到在原始数据中有一些记录具有相同的 data_rfe_id 和相同的 seq_id 因此,我使用 Window 函数应用 row_number 以便我可以过滤具有 row_num === 1 的记录
但是当有大量数据集时它似乎不起作用。我看到应用了相同的 rowNumber 。
为什么会这样?
在数据框上应用窗口函数之前是否需要重新洗牌?
我希望每个 data_rfe_id 都有一个唯一的排名号
我只想使用窗口函数来实现这一点。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
.....
scala> df.printSchema
root
|-- transitional_key: string (nullable = true)
|-- seq_id: string (nullable = true)
|-- data_rfe_id: string (nullable = true)
|-- service_id: string (nullable = true)
|-- event_start_date_time: string (nullable = true)
|-- event_id: string (nullable = true)
val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc,df("service_id").desc)
val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction))
rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false)
预期结果:
+------------------------------------+-----------------+-----------+-------+
|data_rfe_id |seq_id |service_id|row_num|
+------------------------------------+-----------------+-----------+-------+
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826 |4039 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821 |3356 |2 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802 |1857 |3 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2156 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2103 |2 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2083 |3 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2082 |4 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2076 |5 |
我根据上面的代码得到的实际结果:
+------------------------------------+-----------------+-----------+-------+
|data_rfe_id |seq_id |service_id|row_num|
+------------------------------------+-----------------+-----------+-------+
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826 |4039 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821 |3356 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802 |1857 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2156 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2103 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2083 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2082 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2076 |1 |
谁能解释我为什么会得到这些意想不到的结果?以及如何解决?
【问题讨论】:
【参考方案1】:基本上你想要 rank , seq_id 和 service_id 按 desc 顺序排列。添加 rangeBetween 与您需要的范围。排名可能对您有用。以下是sn-p的代码:
val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id"),df("service_id")).desc().rangeBetween(-MAXNUMBER,MAXNUMBER))
val rankDF =df.withColumn( "rank", rank().over(windowFunction) )
由于您使用的是旧版本的 spark 不知道它是否可以工作。 windowSpec 有问题,这里是reference
【讨论】:
当我使用 HiveContext 时,我的代码可以工作,但它会为某些 task_id 单独抛出另一个异常。异常是由以下原因引起的:java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String 无法在 org.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) 处转换为 java.lang.Integer。 apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)以上是关于从 Spark sql Windows 函数获得意外结果的主要内容,如果未能解决你的问题,请参考以下文章
从 org.apache.spark.sql.types.StructType 生成 AvroSchema