Spark (Scala) - 在 DataFrame 中恢复爆炸

Posted

技术标签:

【中文标题】Spark (Scala) - 在 DataFrame 中恢复爆炸【英文标题】:Spark (Scala) - Reverting explode in a DataFrame 【发布时间】:2018-04-02 13:54:43 【问题描述】:

我最初有一个如下的 DataFrame:

Key     Emails                      PassportNum     Age
0001    [Alan@gmail,Alan@hotmail]   passport1       23
0002    [Ben@gmail,Ben@hotmail]     passport2       28

我需要对每封电子邮件应用一个函数,例如在末尾添加“_2”之类的虚拟函数,该操作不相关。所以我会这样爆这个专栏:

val dfExplode = df.withColumn("Email",explode($"Emails")).drop("Emails")

现在我将有一个这样的数据框:

Key     Email           PassportNum     Age
0001    Alan@gmail      passport1       23
0001    Alan@hotmail    passport1       23
0002    Ben@gmail       passport2       28
0002    Ben@hotmail     passport2       28

我在护照上应用了任何更改,然后我想要的又是这样的:

Key     Emails                          PassportNum     Age
0001    [Alan_2@gmail,Alan_2@hotmail]   passport1       23
0002    [Ben_2@gmail,Ben_2@hotmail]     passport2       28

我正在考虑的选项是这样的:

dfOriginal = dfExploded.groupBy("Key","PassportNum","Age").agg(collect_set("Email").alias("Emails"))

在这种情况下,这可能不是一个糟糕的方法。但在我的真实案例中,我在单个列上执行分解,我还有另外 20 个列,例如 PassportNum、Age... 将被复制。

这意味着我需要在 groupBy 中添加大约 20 列,当我真的可以通过单个列执行 group by 时,例如唯一的 Key。

我正在考虑在 agg 中添加这些列,如下所示:

dfOriginal = dfExploded.groupBy("Key").agg(collect_set("Email").alias("Emails"),collect_set("PassportNum"),collect_set("Age"))

但我不希望它们位于单个元素数组中。

有没有办法在没有任何collect_* 的情况下进行聚合?有没有更简单的方法来撤消explode

【问题讨论】:

你不应该使用first 来表示PassportNumAge,因为它们在爆炸后无论如何都会有相同的值吗? 你的意思是先收集后使用? 【参考方案1】:

假设您想留在 DataFrame 世界中,定义一个操作输入数组的 UDF 可能是值得的。将 Seq 作为输入并返回修改后的东西。例如

def myUdf = udf[Seq[String], Seq[String]]  
    inputSeq => inputSeq.map(elem => elem + "_2")


df.withColumn("Emails", myUdf($"Emails"))

更好的是,您可以将确切的逻辑作为参数传递:

def myUdf(myFunc: String => String) = udf[Seq[String], Seq[String]] 
    inputSeq => inputSeq.map(myFunc)


df.withColumn("Emails", myUdf((email: String) => email + "_XYZ")($"Emails"))

【讨论】:

感谢您的反馈。我宁愿避免这种情况,因为数组的内容是 Map 我需要将地图的每个字段转换为列 不确定您是否完全理解该问题。您不能将Seq[Map[String, String]] 作为输入传递吗?之后你可以在 Scala 中做任何事情 我会试试的 我想我会遵循你的解决方案。如果我将 Seq[Map[String, String]] 之类的东西传递给 udf,您能否举个例子,我如何访问地图中的特定字段以在其上应用另一个 udf?所以如果地图有 field1,field2,field3 => field1,field2,udf2(field3) 我不确定这是否可能 那时您的inputSeq 将是Seq[Map[String, String]]。所以像这样的东西:inputSeq.map(_.get("myKey")) 应该可以帮助您入门。它将返回一个Seq[String],其中的字符串是“myKey”的所有 Map 值【参考方案2】:

除了所有常见字段的 groupby 之外,另一个选项是在单独的临时数据帧上进行分解,然后从原始数据框中删除分解的列并加入重新分组的列

但是,编写一个 UDF 可能会更简单,它可以直接操作数组而无需分解和聚集

def handleEmail(emails: mutable.WrappedArray[String]) = 
     emails.map(dosomething)
  

context.udf.register("handleEmailsm"m (em:mutabe.WrappedArray[String]) => handleEmail(em))

【讨论】:

谢谢 Arnon,但我更喜欢使用您提到的 UDF 方法,因为数组的内容是 Map 我需要将地图的每个字段转换为列 【参考方案3】:

这意味着我需要在 groupBy 中添加大约 20 列,当我真的可以通过单个列执行 group by 时,例如唯一的 Key。

您可以通过一个简单的技巧跳过编写每个列名,如下所示,您可以使用所有列名(或选择的)除了爆炸列名称

import org.apache.spark.sql.functions._
val dfExploded = df.withColumn("Emails", explode($"Emails"))

val groupColumns = dfExploded.columns.filterNot(_.equalsIgnoreCase("Emails"))

val dfOriginal = dfExploded.groupBy(groupColumns.map(col): _*).agg(collect_set("Emails").alias("Emails"))

创建结构列

您可以使用 struct 内置函数创建单个列使用 groupBy 中的单个列作为

val groupColumns = df.columns.filterNot(_.equalsIgnoreCase("Emails"))

import org.apache.spark.sql.functions._
val dfExploded = df.select(struct(groupColumns.map(col): _*).as("groupedKey"), col("Emails"))
  .withColumn("Emails", explode($"Emails"))

这会给你

+-------------------+------------+
|groupedKey         |Emails      |
+-------------------+------------+
|[0001,passport1,23]|Alan@gmail  |
|[0001,passport1,23]|Alan@hotmail|
|[0002,passport2,28]|Ben@gmail   |
|[0002,passport2,28]|Ben@hotmail |
+-------------------+------------+

然后在 groupBy 中使用 groupedKey 并在 select 中再次将它们分开

val dfOriginal = dfExploded.groupBy("groupedKey").agg(collect_set("Emails").alias("Emails"))
  .select($"groupedKey.*", $"Emails")

【讨论】:

感谢拉梅什的绝招。但实际上我不想因为查询的长度而避免该解决方案,而是因为 groupBy 超过 20 列的成本可能很高。据我了解,groupBy 中的列越少,速度越快,不是吗? 谢谢,我会试试这个 对于一个用例,我需要做类似的工作,但是由于我的数据中存在映射列的结构,使用这种方法我遇到了一个错误。对我们有一列映射字符串结构的数据有什么想法吗?如何完成这项工作?

以上是关于Spark (Scala) - 在 DataFrame 中恢复爆炸的主要内容,如果未能解决你的问题,请参考以下文章

如何在idea中用maven配置spark和scala

如何在 Spark/Scala 中使用 countDistinct?

在 Ubuntu16.04 中搭建 Spark 单机开发环境 (JDK + Scala + Spark)

如何在 Spark/Scala 中查找具有许多空值的列

02 使用spark进行词频统计【scala交互】

如何使用 spark-scala 在 spark 数据帧上执行枢轴?