spark sql 条件最大值
Posted
技术标签:
【中文标题】spark sql 条件最大值【英文标题】:spark sql conditional maximum 【发布时间】:2017-06-14 14:48:00 【问题描述】:我有一个高表,每组最多包含 10 个值。如何将此表转换为宽格式,即添加 2 列,这些列类似于小于或等于阈值的值?
我想找到每组的最大值,但它需要小于指定值,例如:
min(max('value1), lit(5)).over(Window.partitionBy('grouping))
但是min()
只适用于列而不适用于从内部函数返回的 Scala 值?
问题可以描述为:
Seq(Seq(1,2,3,4).max,5).min
其中Seq(1,2,3,4)
由窗口返回。
如何在 spark sql 中表述这个?
编辑
例如
+--------+-----+---------+
|grouping|value|something|
+--------+-----+---------+
| 1| 1| first|
| 1| 2| second|
| 1| 3| third|
| 1| 4| fourth|
| 1| 7| 7|
| 1| 10| 10|
| 21| 1| first|
| 21| 2| second|
| 21| 3| third|
+--------+-----+---------+
创建者
case class MyThing(grouping: Int, value:Int, something:String)
val df = Seq(MyThing(1,1, "first"), MyThing(1,2, "second"), MyThing(1,3, "third"),MyThing(1,4, "fourth"),MyThing(1,7, "7"), MyThing(1,10, "10"),
MyThing(21,1, "first"), MyThing(21,2, "second"), MyThing(21,3, "third")).toDS
在哪里
df
.withColumn("somethingAtLeast5AndMaximum5", max('value).over(Window.partitionBy('grouping)))
.withColumn("somethingAtLeast6OupToThereshold2", max('value).over(Window.partitionBy('grouping)))
.show
返回
+--------+-----+---------+----------------------------+-------------------------+
|grouping|value|something|somethingAtLeast5AndMaximum5| somethingAtLeast6OupToThereshold2 |
+--------+-----+---------+----------------------------+-------------------------+
| 1| 1| first| 10| 10|
| 1| 2| second| 10| 10|
| 1| 3| third| 10| 10|
| 1| 4| fourth| 10| 10|
| 1| 7| 7| 10| 10|
| 1| 10| 10| 10| 10|
| 21| 1| first| 3| 3|
| 21| 2| second| 3| 3|
| 21| 3| third| 3| 3|
+--------+-----+---------+----------------------------+-------------------------+
相反,我更愿意制定:
lit(Seq(max('value).asInstanceOf[java.lang.Integer], new java.lang.Integer(2)).min).over(Window.partitionBy('grouping))
但这不起作用,因为max('value)
不是标量值。
预期的输出应该是这样的
+--------+-----+---------+----------------------------+-------------------------+
|grouping|value|something|somethingAtLeast5AndMaximum5|somethingAtLeast6OupToThereshold2|
+--------+-----+---------+----------------------------+-------------------------+
| 1| 4| fourth| 4| 7|
| 21| 1| first| 3| NULL|
+--------+-----+---------+----------------------------+-------------------------+
编辑2
尝试支点时
df.groupBy("grouping").pivot("value").agg(first('something)).show
+--------+-----+------+-----+------+----+----+
|grouping| 1| 2| 3| 4| 7| 10|
+--------+-----+------+-----+------+----+----+
| 1|first|second|third|fourth| 7| 10|
| 21|first|second|third| null|null|null|
+--------+-----+------+-----+------+----+----+
问题的第二部分仍然是某些列可能不存在或为空。
当聚合到数组时:
df.groupBy("grouping").agg(collect_list('value).alias("value"), collect_list('something).alias("something"))
+--------+-------------------+--------------------+
|grouping| value| something|
+--------+-------------------+--------------------+
| 1|[1, 2, 3, 4, 7, 10]|[first, second, t...|
| 21| [1, 2, 3]|[first, second, t...|
+--------+-------------------+--------------------+
这些值已经彼此相邻,但需要选择正确的值。这可能仍然比连接或窗口函数更有效。
【问题讨论】:
你能举一个数据的例子和你想解决的问题吗? 请看编辑。 您可以为您添加的特定样本数据添加预期输出吗? 请查看预期输出。如果可能,我想避免自加入。 【参考方案1】:分两个单独的步骤会更容易 - 在 Window 上计算 max
,然后在结果上使用 when...otherwise
以生成 min(x, 5)
:
df.withColumn("tmp", max('value1).over(Window.partitionBy('grouping)))
.withColumn("result", when('tmp > lit(5), 5).otherwise('tmp))
编辑:一些示例数据来澄清这一点:
val df = Seq((1, 1),(1, 2),(1, 3),(1, 4),(2, 7),(2, 8))
.toDF("grouping", "value1")
df.withColumn("result", max('value1).over(Window.partitionBy('grouping)))
.withColumn("result", when('result > lit(5), 5).otherwise('result))
.show()
// +--------+------+------+
// |grouping|value1|result|
// +--------+------+------+
// | 1| 1| 4| // 4, because Seq(Seq(1,2,3,4).max,5).min = 4
// | 1| 2| 4|
// | 1| 3| 4|
// | 1| 4| 4|
// | 2| 7| 5| // 5, because Seq(Seq(7,8).max,5).min = 5
// | 2| 8| 5|
// +--------+------+------+
【讨论】:
所以实际上我有一个高桌子,每组 1....10 个,对于值 5 和 8,我想将列的值提取为宽格式,即附加 2列。您的解决方案将仅应用全局最大值,大多数情况下将导致 10。注意:并非所有值 1...10 都存在,有时存在空值。这就是为什么如果 5 和 8 不可用,我想取下一个可用的较小值。 您的解决方案主要适用于 b) 部分,即 8 值较大的组。但是,如果最大值为 10,我们将取最小值 (10,8)。这可能是个问题,因为 8 可能不存在。在这种情况下,我想取下一个最小值,即 7(如果存在)。如果不是下一个。如果它小于边界,即 5,则结果应该是 None。 我不确定我是否在关注;这回答了原始帖子,该帖子指出它应该计算Seq(Seq(<values matching group>).max,5).min
的 DataFrame 等效项。您可能需要完全改写帖子或发布新问题,我无法将您的 cmets 与原始帖子相关联...
我会尝试改写。以上是关于spark sql 条件最大值的主要内容,如果未能解决你的问题,请参考以下文章
为啥 org.apache.spark.sql.types.DecimalType 在 Spark SQL 中的最大精度值为 38?