如何使用collect_list?

Posted

技术标签:

【中文标题】如何使用collect_list?【英文标题】:How to use collect_list? 【发布时间】:2016-04-11 18:09:36 【问题描述】:

我有两个DataFrame,想根据date,time,mid,binImbalance字段加入它们,并在列表中收集timeBmidB中的对应值。

我已尝试使用以下代码:

val d1: DataFrame
val d3: DataFrame
val d2 = d3
     .withColumnRenamed("date", "dateC")
     .withColumnRenamed("milliSec", "milliSecC")
     .withColumnRenamed("mid", "midC")
     .withColumnRenamed("time", "timeC")
     .withColumnRenamed("binImbalance", "binImbalanceC")

  d1.join(d2, d1("date") === d2("dateC") and 
              d1("time") === d2("timeC") and 
              d1("mid") === d2("midC")
         )
    .groupBy("date", "time", "mid", "binImbalance")
    .agg(collect_list("timeB"),collect_list("midB"))

但这不起作用,因为我收到错误:: Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35。 同时,如果我重命名timeB 列之一,我将无法收集列表中的值。

一个示例结果应该是:

+-----+---------+------+------------+---------+------+
| date|     time|   mid|binImbalance|    timeB|  midB|
+-----+---------+------+------------+---------+------+
|  1  |     1   |  10  |           1|    4    |  10  |          
|  2  |     2   |  20  |           2|    5    |  11  |            
|  3  |     3   |  30  |           3|    6    |  12  |             


+-----+---------+------+------------+---------+------+
| date|     time|   mid|binImbalance|    timeB|  midB|
+-----+---------+------+------------+---------+------+
|  1  |     1   |  10  |           1|    7    |  13  |          
|  2  |     2   |  20  |           2|    8    |  14  |            
|  3  |     3   |  30  |           3|    9    |  15  |

结果:

+-----+---------+------+------------+---------+-----------+
| date|     time|   mid|binImbalance| ListTime|  ListMid  |
+-----+---------+------+------------+---------+-----------+
|  1  |     1   |  10  |           1|   [4,7] |  [10,13]  |          
|  2  |     2   |  20  |           2|   [5,8] |  [11,14]  |            
|  3  |     3   |  30  |           3|   [6,9] |  [12,15]  |

最小、完整和可验证的示例

d1            d2
id   data     id   data    
--   ----     --   ----
1    1        1    2
2    4        2    5
3    6        3    3

Result
id   list
--   ----
1    [1,2]
2    [4,5]
3    [6,3]

【问题讨论】:

能否在问题中添加d1.printSchemad3.printSchema 【参考方案1】:

小例子的解决方案

import org.apache.spark.sql.functions.udf

val aggregateDataFrames = udf( (x: Double, y: Double) => Seq(x,y))

val d3 = d2.withColumnRenamed("id","id3")
           .withColumnRenamed("data","data3")

val joined = d1.join(d3, d1("id") === d3("id3"))


val result = joined
              .withColumn("list", aggregateDataFrames(joined("data"),joined("data3")))
              .select("id","list")

【讨论】:

您可能想使用val aggregateDataFrames: (Double, Double) => Seq[Double] 并在udf 中保留类型。此外,使用'(单个撇号)访问列(或$(美元符号)),因为它减少了更多的击键(实际上你可能更喜欢它)。

以上是关于如何使用collect_list?的主要内容,如果未能解决你的问题,请参考以下文章

Hive---collect_list和collect_set

PySpark 使用 collect_list 收集不同长度的数组

在 PySpark 中使用 collect_list 时 Java 内存不足

在 Spark SQL 中使用 collect_list 和 collect_set

在 SQL (Hive) 中使用 collect_list 函数来聚合用户序列

collect_list 通过保留基于另一个变量的顺序