Scala(Spark)-当列是列表时如何分组

Posted

技术标签:

【中文标题】Scala(Spark)-当列是列表时如何分组【英文标题】:Scala(Spark)- how to groupby when columns are list 【发布时间】:2019-01-14 14:30:08 【问题描述】:

在我的 Scala 程序中,我有一个带有如下架构的数据框:

root
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- SEGMENT_EMAIL: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- SEGMENT_ADDRESS_STATE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- SEGMENT_ADDRESS_POSTAL_CODE: array (nullable = true)
 |    |-- element: string (containsNull = true)

一些示例值是:

|FIRST_NAME      |LAST_NAME      |CONFIRMATION_NUMBER|       SEGMENT_EMAIL|SEGMENT_ADDRESS_STATE|SEGMENT_ADDRESS_POSTAL_CODE|
+----------------+---------------+-------------------+--------------------+---------------------+---------------------------+
|           Stine|          Rocha|         [48978451]|[Xavier.Vich@gmail..|                 [MA]|               [01545-1300]|
|          Aurora|      Markusson|         [26341542]|                  []|                 [AR]|                    [72716]|
|           Stine|          Rocha|         [29828771]|[Xavier.Vich@gmail..|                 [OH]|               [45101-9613]|
|          Aubrey|      Fagerland|         [24572991]|[Aubrey.Fagerland...|                   []|                         []|

当列值采用列表形式时,如何根据名字 + 姓氏 + 电子邮件对相似记录进行分组。

我想要这样的输出:

|FIRST_NAME      |LAST_NAME      |CONFIRMATION_NUMBER  |       SEGMENT_EMAIL|SEGMENT_ADDRESS_STATE|SEGMENT_ADDRESS_POSTAL_CODE|
+----------------+---------------+---------------------+--------------------+---------------------+---------------------------+
|           Stine|          Rocha| [48978451, 29828771]|[Xavier.Vich@gmail..|             [MA, OH]|   [01545-1300, 45101-9613]|
|          Aurora|      Markusson|           [26341542]|                  []|                 [AR]|                    [72716]|
|          Aubrey|      Fagerland|           [24572991]|[Aubrey.Fagerland...|                   []|                         []|

谢谢!

【问题讨论】:

【参考方案1】:

这可以通过编写用户定义函数来将多个Seq 合并到一个Seq 中来完成。这是获得所需输出的方法:

创建输入数据框: 虽然架构中没有提到 CONFIRMATION_NUMBER 字段的数据类型,但我假设它是整数。

import spark.implicits._
    val df = Seq(("Stine",  "Rocha", Seq(48978451), Seq("Xavier.Vich@gmail"), Seq("MA"), Seq("01545-1300")),
      ("Aurora", "Markusson", Seq(26341542),Seq(),Seq("AR"),Seq("72716")),
      ("Stine",  "Rocha", Seq(29828771),Seq("Xavier.Vich@gmail"),Seq("OH"),       Seq("45101-9613")),
      ("Aubrey", "Fagerland",Seq(24572991),Seq("Aubrey.Fagerland"),Seq(),       Seq())).
      toDF("FIRST_NAME", "LAST_NAME", "CONFIRMATION_NUMBER", "SEGMENT_EMAIL", "SEGMENT_ADDRESS_STATE", "SEGMENT_ADDRESS_POSTAL_CODE")

聚合列:现在对所需列应用聚合以获得SeqSeq。这是执行此操作的代码:

   import org.apache.spark.sql.functions.collect_list
    val df1 = df.groupBy("FIRST_NAME", "LAST_NAME").
          agg(collect_list("CONFIRMATION_NUMBER").as("cnlist"),
            collect_list("SEGMENT_EMAIL").as("selist"),
            collect_list("SEGMENT_ADDRESS_STATE").as("saslist"),
            collect_list("SEGMENT_ADDRESS_POSTAL_CODE").as("sapclist"))

这是df1的输出:

+----------+---------+------------------------+------------------------------------------+------------+----------------------------+
|FIRST_NAME|LAST_NAME|cnlist                  |selist                                    |saslist     |sapclist                    |
+----------+---------+------------------------+------------------------------------------+------------+----------------------------+
|Stine     |Rocha    |[[48978451], [29828771]]|[[Xavier.Vich@gmail], [Xavier.Vich@gmail]]|[[MA], [OH]]|[[01545-1300], [45101-9613]]|
|Aurora    |Markusson|[[26341542]]            |[[]]                                      |[[AR]]      |[[72716]]                   |
|Aubrey    |Fagerland|[[24572991]]            |[[Aubrey.Fagerland]]                      |[[]]        |[[]]                        |
+----------+---------+------------------------+------------------------------------------+------------+----------------------------+

应用 udf: 现在应用用户定义函数(udf)将数组的数组合并为单个数组。我已经为整数和字符串数据类型编写了两个 udf。

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
 val concat_nested_string_seq:UserDefinedFunction = udf((seq_values:Seq[Seq[String]]) => 
      var output_seq:Seq[String] = Seq()
      seq_values.foreach(output_seq ++= _)
      (output_seq)
    )

  val concat_nested_integer_seq:UserDefinedFunction = udf((seq_values:Seq[Seq[Integer]]) => 
      var output_seq:Seq[Integer] = Seq()
      seq_values.foreach(output_seq ++= _)
      (output_seq)
    )
  val output_df = df1.withColumn("CONFIRMATION_NUMBER", concat_nested_integer_seq($"cnlist")).
                  withColumn("SEGMENT_EMAIL", concat_nested_string_seq($"selist")).
                  withColumn("SEGMENT_ADDRESS_STATE", concat_nested_string_seq($"saslist")).
                  withColumn("SEGMENT_ADDRESS_POSTAL_CODE", concat_nested_string_seq($"sapclist")).
                  drop("cnlist", "selist", "saslist", "sapclist")

output_df 数据框显示所需的输出。它也可以通过展平数组数据类型列然后在列上聚合来解决。但这可能是昂贵的操作。

【讨论】:

谢谢,当电子邮件的名字和姓氏始终相同时,上述解决方案效果很好。当有两条名称相同但电子邮件不同或丢失的记录时,就会出现问题。在这种情况下,我不想合并记录。那么有没有一种方法来执行 groupBy ,它将使用(名字、姓氏、电子邮件)的组合来对记录进行分组,前提是电子邮件作为字符串序列存在于列中。

以上是关于Scala(Spark)-当列是列表时如何分组的主要内容,如果未能解决你的问题,请参考以下文章

Scala中的Spark分组映射UDF

Scala Spark:计算分组的 AUC

Scala:如何按键分组并在 scala 中对值求和并以预期的返回类型返回列表

当按使用的时间窗口分组时,Spark 如何确定第一个窗口的 window.start?

PySpark:当列是列表时,将列添加到 DataFrame

数据框:如何在 Scala 中分组/计数然后按计数排序