如何使用collect_list?
Posted
技术标签:
【中文标题】如何使用collect_list?【英文标题】:How to use collect_list? 【发布时间】:2016-04-11 18:09:36 【问题描述】:我有两个DataFrame
,想根据date
,time
,mid
,binImbalance
字段加入它们,并在列表中收集timeB
和midB
中的对应值。
我已尝试使用以下代码:
val d1: DataFrame
val d3: DataFrame
val d2 = d3
.withColumnRenamed("date", "dateC")
.withColumnRenamed("milliSec", "milliSecC")
.withColumnRenamed("mid", "midC")
.withColumnRenamed("time", "timeC")
.withColumnRenamed("binImbalance", "binImbalanceC")
d1.join(d2, d1("date") === d2("dateC") and
d1("time") === d2("timeC") and
d1("mid") === d2("midC")
)
.groupBy("date", "time", "mid", "binImbalance")
.agg(collect_list("timeB"),collect_list("midB"))
但这不起作用,因为我收到错误:: Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35
。
同时,如果我重命名timeB
列之一,我将无法收集列表中的值。
一个示例结果应该是:
+-----+---------+------+------------+---------+------+
| date| time| mid|binImbalance| timeB| midB|
+-----+---------+------+------------+---------+------+
| 1 | 1 | 10 | 1| 4 | 10 |
| 2 | 2 | 20 | 2| 5 | 11 |
| 3 | 3 | 30 | 3| 6 | 12 |
+-----+---------+------+------------+---------+------+
| date| time| mid|binImbalance| timeB| midB|
+-----+---------+------+------------+---------+------+
| 1 | 1 | 10 | 1| 7 | 13 |
| 2 | 2 | 20 | 2| 8 | 14 |
| 3 | 3 | 30 | 3| 9 | 15 |
结果:
+-----+---------+------+------------+---------+-----------+
| date| time| mid|binImbalance| ListTime| ListMid |
+-----+---------+------+------------+---------+-----------+
| 1 | 1 | 10 | 1| [4,7] | [10,13] |
| 2 | 2 | 20 | 2| [5,8] | [11,14] |
| 3 | 3 | 30 | 3| [6,9] | [12,15] |
最小、完整和可验证的示例
d1 d2
id data id data
-- ---- -- ----
1 1 1 2
2 4 2 5
3 6 3 3
Result
id list
-- ----
1 [1,2]
2 [4,5]
3 [6,3]
【问题讨论】:
能否在问题中添加d1.printSchema
和d3.printSchema
?
【参考方案1】:
小例子的解决方案:
import org.apache.spark.sql.functions.udf
val aggregateDataFrames = udf( (x: Double, y: Double) => Seq(x,y))
val d3 = d2.withColumnRenamed("id","id3")
.withColumnRenamed("data","data3")
val joined = d1.join(d3, d1("id") === d3("id3"))
val result = joined
.withColumn("list", aggregateDataFrames(joined("data"),joined("data3")))
.select("id","list")
【讨论】:
您可能想使用val aggregateDataFrames: (Double, Double) => Seq[Double]
并在udf
中保留类型。此外,使用'
(单个撇号)访问列(或$
(美元符号)),因为它减少了更多的击键(实际上你可能更喜欢它)。以上是关于如何使用collect_list?的主要内容,如果未能解决你的问题,请参考以下文章
Hive---collect_list和collect_set
PySpark 使用 collect_list 收集不同长度的数组
在 PySpark 中使用 collect_list 时 Java 内存不足
在 Spark SQL 中使用 collect_list 和 collect_set