spark sql 条件最大值

Posted

技术标签:

【中文标题】spark sql 条件最大值【英文标题】:spark sql conditional maximum 【发布时间】:2017-06-14 14:48:00 【问题描述】:

我有一个高表,每组最多包含 10 个值。如何将此表转换为宽格式,即添加 2 列,这些列类似于小于或等于阈值的值?

我想找到每组的最大值,但它需要小于指定值,例如:

min(max('value1), lit(5)).over(Window.partitionBy('grouping))

但是min() 只适用于列而不适用于从内部函数返回的 Scala 值?

问题可以描述为:

Seq(Seq(1,2,3,4).max,5).min

其中Seq(1,2,3,4)由窗口返回。

如何在 spark sql 中表述这个?

编辑

例如

+--------+-----+---------+
|grouping|value|something|
+--------+-----+---------+
|       1|    1|    first|
|       1|    2|   second|
|       1|    3|    third|
|       1|    4|   fourth|
|       1|    7|        7|
|       1|   10|       10|
|      21|    1|    first|
|      21|    2|   second|
|      21|    3|    third|
+--------+-----+---------+

创建者

case class MyThing(grouping: Int, value:Int, something:String)
val df = Seq(MyThing(1,1, "first"), MyThing(1,2, "second"), MyThing(1,3, "third"),MyThing(1,4, "fourth"),MyThing(1,7, "7"), MyThing(1,10, "10"),
MyThing(21,1, "first"), MyThing(21,2, "second"), MyThing(21,3, "third")).toDS

在哪里

df
.withColumn("somethingAtLeast5AndMaximum5", max('value).over(Window.partitionBy('grouping)))
.withColumn("somethingAtLeast6OupToThereshold2", max('value).over(Window.partitionBy('grouping)))
.show

返回

+--------+-----+---------+----------------------------+-------------------------+
|grouping|value|something|somethingAtLeast5AndMaximum5| somethingAtLeast6OupToThereshold2 |
+--------+-----+---------+----------------------------+-------------------------+
|       1|    1|    first|                          10|                       10|
|       1|    2|   second|                          10|                       10|
|       1|    3|    third|                          10|                       10|
|       1|    4|   fourth|                          10|                       10|
|       1|    7|        7|                          10|                       10|
|       1|   10|       10|                          10|                       10|
|      21|    1|    first|                           3|                        3|
|      21|    2|   second|                           3|                        3|
|      21|    3|    third|                           3|                        3|
+--------+-----+---------+----------------------------+-------------------------+

相反,我更愿意制定:

lit(Seq(max('value).asInstanceOf[java.lang.Integer], new java.lang.Integer(2)).min).over(Window.partitionBy('grouping))

但这不起作用,因为max('value) 不是标量值。

预期的输出应该是这样的

+--------+-----+---------+----------------------------+-------------------------+
|grouping|value|something|somethingAtLeast5AndMaximum5|somethingAtLeast6OupToThereshold2|
+--------+-----+---------+----------------------------+-------------------------+
|       1|    4|   fourth|                           4|                        7|
|      21|    1|    first|                           3|                     NULL|
+--------+-----+---------+----------------------------+-------------------------+

编辑2

尝试支点时

df.groupBy("grouping").pivot("value").agg(first('something)).show
+--------+-----+------+-----+------+----+----+
|grouping|    1|     2|    3|     4|   7|  10|
+--------+-----+------+-----+------+----+----+
|       1|first|second|third|fourth|   7|  10|
|      21|first|second|third|  null|null|null|
+--------+-----+------+-----+------+----+----+

问题的第二部分仍然是某些列可能不存在或为空。

当聚合到数组时:

df.groupBy("grouping").agg(collect_list('value).alias("value"), collect_list('something).alias("something"))
+--------+-------------------+--------------------+
|grouping|              value|           something|
+--------+-------------------+--------------------+
|       1|[1, 2, 3, 4, 7, 10]|[first, second, t...|
|      21|          [1, 2, 3]|[first, second, t...|
+--------+-------------------+--------------------+

这些值已经彼此相邻,但需要选择正确的值。这可能仍然比连接或窗口函数更有效。

【问题讨论】:

你能举一个数据的例子和你想解决的问题吗? 请看编辑。 您可以为您添加的特定样本数据添加预期输出吗? 请查看预期输出。如果可能,我想避免自加入。 【参考方案1】:

分两个单独的步骤会更容易 - 在 Window 上计算 max,然后在结果上使用 when...otherwise 以生成 min(x, 5)

df.withColumn("tmp", max('value1).over(Window.partitionBy('grouping)))
  .withColumn("result", when('tmp > lit(5), 5).otherwise('tmp))

编辑:一些示例数据来澄清这一点:

val df = Seq((1, 1),(1, 2),(1, 3),(1, 4),(2, 7),(2, 8))
  .toDF("grouping", "value1")

df.withColumn("result", max('value1).over(Window.partitionBy('grouping)))
  .withColumn("result", when('result > lit(5), 5).otherwise('result))
  .show()

// +--------+------+------+
// |grouping|value1|result|
// +--------+------+------+
// |       1|     1|     4| // 4, because Seq(Seq(1,2,3,4).max,5).min = 4
// |       1|     2|     4|
// |       1|     3|     4|
// |       1|     4|     4|
// |       2|     7|     5| // 5, because Seq(Seq(7,8).max,5).min = 5
// |       2|     8|     5|
// +--------+------+------+

【讨论】:

所以实际上我有一个高桌子,每组 1....10 个,对于值 5 和 8,我想将列的值提取为宽格式,即附加 2列。您的解决方案将仅应用全局最大值,大多数情况下将导致 10。注意:并非所有值 1...10 都存在,有时存在空值。这就是为什么如果 5 和 8 不可用,我想取下一个可用的较小值。 您的解决方案主要适用于 b) 部分,即 8 值较大的组。但是,如果最大值为 10,我们将取最小值 (10,8)。这可能是个问题,因为 8 可能不存在。在这种情况下,我想取下一个最小值,即 7(如果存在)。如果不是下一个。如果它小于边界,即 5,则结果应该是 None。 我不确定我是否在关注;这回答了原始帖子,该帖子指出它应该计算 Seq(Seq(<values matching group>).max,5).min 的 DataFrame 等效项。您可能需要完全改写帖子或发布新问题,我无法将您的 cmets 与原始帖子相关联... 我会尝试改写。

以上是关于spark sql 条件最大值的主要内容,如果未能解决你的问题,请参考以下文章

spark sql 连续登录最大天数

Spark SQL - 查找每年一个月的最大值

为啥 org.apache.spark.sql.types.DecimalType 在 Spark SQL 中的最大精度值为 38?

Spark SQL 从数据源动态获取最大值和最小值

选择具有最大值的行,并结合WHERE。 MAX和CAST,在spark.sql中

spark -SQL 配置参数