Spark (Scala) - 在 DataFrame 中恢复爆炸
Posted
技术标签:
【中文标题】Spark (Scala) - 在 DataFrame 中恢复爆炸【英文标题】:Spark (Scala) - Reverting explode in a DataFrame 【发布时间】:2018-04-02 13:54:43 【问题描述】:我最初有一个如下的 DataFrame:
Key Emails PassportNum Age
0001 [Alan@gmail,Alan@hotmail] passport1 23
0002 [Ben@gmail,Ben@hotmail] passport2 28
我需要对每封电子邮件应用一个函数,例如在末尾添加“_2”之类的虚拟函数,该操作不相关。所以我会这样爆这个专栏:
val dfExplode = df.withColumn("Email",explode($"Emails")).drop("Emails")
现在我将有一个这样的数据框:
Key Email PassportNum Age
0001 Alan@gmail passport1 23
0001 Alan@hotmail passport1 23
0002 Ben@gmail passport2 28
0002 Ben@hotmail passport2 28
我在护照上应用了任何更改,然后我想要的又是这样的:
Key Emails PassportNum Age
0001 [Alan_2@gmail,Alan_2@hotmail] passport1 23
0002 [Ben_2@gmail,Ben_2@hotmail] passport2 28
我正在考虑的选项是这样的:
dfOriginal = dfExploded.groupBy("Key","PassportNum","Age").agg(collect_set("Email").alias("Emails"))
在这种情况下,这可能不是一个糟糕的方法。但在我的真实案例中,我在单个列上执行分解,我还有另外 20 个列,例如 PassportNum、Age... 将被复制。
这意味着我需要在 groupBy 中添加大约 20 列,当我真的可以通过单个列执行 group by 时,例如唯一的 Key。
我正在考虑在 agg 中添加这些列,如下所示:
dfOriginal = dfExploded.groupBy("Key").agg(collect_set("Email").alias("Emails"),collect_set("PassportNum"),collect_set("Age"))
但我不希望它们位于单个元素数组中。
有没有办法在没有任何collect_*
的情况下进行聚合?有没有更简单的方法来撤消explode
?
【问题讨论】:
你不应该使用first
来表示PassportNum
和Age
,因为它们在爆炸后无论如何都会有相同的值吗?
你的意思是先收集后使用?
【参考方案1】:
假设您想留在 DataFrame 世界中,定义一个操作输入数组的 UDF 可能是值得的。将 Seq 作为输入并返回修改后的东西。例如
def myUdf = udf[Seq[String], Seq[String]]
inputSeq => inputSeq.map(elem => elem + "_2")
df.withColumn("Emails", myUdf($"Emails"))
更好的是,您可以将确切的逻辑作为参数传递:
def myUdf(myFunc: String => String) = udf[Seq[String], Seq[String]]
inputSeq => inputSeq.map(myFunc)
df.withColumn("Emails", myUdf((email: String) => email + "_XYZ")($"Emails"))
【讨论】:
感谢您的反馈。我宁愿避免这种情况,因为数组的内容是 MapSeq[Map[String, String]]
作为输入传递吗?之后你可以在 Scala 中做任何事情
我会试试的
我想我会遵循你的解决方案。如果我将 Seq[Map[String, String]] 之类的东西传递给 udf,您能否举个例子,我如何访问地图中的特定字段以在其上应用另一个 udf?所以如果地图有 field1,field2,field3 => field1,field2,udf2(field3) 我不确定这是否可能
那时您的inputSeq
将是Seq[Map[String, String]]
。所以像这样的东西:inputSeq.map(_.get("myKey"))
应该可以帮助您入门。它将返回一个Seq[String]
,其中的字符串是“myKey”的所有 Map 值【参考方案2】:
除了所有常见字段的 groupby 之外,另一个选项是在单独的临时数据帧上进行分解,然后从原始数据框中删除分解的列并加入重新分组的列
但是,编写一个 UDF 可能会更简单,它可以直接操作数组而无需分解和聚集
def handleEmail(emails: mutable.WrappedArray[String]) =
emails.map(dosomething)
context.udf.register("handleEmailsm"m (em:mutabe.WrappedArray[String]) => handleEmail(em))
【讨论】:
谢谢 Arnon,但我更喜欢使用您提到的 UDF 方法,因为数组的内容是 Map这意味着我需要在 groupBy 中添加大约 20 列,当我真的可以通过单个列执行 group by 时,例如唯一的 Key。
您可以通过一个简单的技巧跳过编写每个列名,如下所示,您可以使用所有列名(或选择的)除了爆炸列名称
import org.apache.spark.sql.functions._
val dfExploded = df.withColumn("Emails", explode($"Emails"))
val groupColumns = dfExploded.columns.filterNot(_.equalsIgnoreCase("Emails"))
val dfOriginal = dfExploded.groupBy(groupColumns.map(col): _*).agg(collect_set("Emails").alias("Emails"))
创建结构列
您可以使用 struct 内置函数创建单个列,使用 groupBy 中的单个列作为
val groupColumns = df.columns.filterNot(_.equalsIgnoreCase("Emails"))
import org.apache.spark.sql.functions._
val dfExploded = df.select(struct(groupColumns.map(col): _*).as("groupedKey"), col("Emails"))
.withColumn("Emails", explode($"Emails"))
这会给你
+-------------------+------------+
|groupedKey |Emails |
+-------------------+------------+
|[0001,passport1,23]|Alan@gmail |
|[0001,passport1,23]|Alan@hotmail|
|[0002,passport2,28]|Ben@gmail |
|[0002,passport2,28]|Ben@hotmail |
+-------------------+------------+
然后在 groupBy 中使用 groupedKey 并在 select 中再次将它们分开
val dfOriginal = dfExploded.groupBy("groupedKey").agg(collect_set("Emails").alias("Emails"))
.select($"groupedKey.*", $"Emails")
【讨论】:
感谢拉梅什的绝招。但实际上我不想因为查询的长度而避免该解决方案,而是因为 groupBy 超过 20 列的成本可能很高。据我了解,groupBy 中的列越少,速度越快,不是吗? 谢谢,我会试试这个 对于一个用例,我需要做类似的工作,但是由于我的数据中存在映射列的结构,使用这种方法我遇到了一个错误。对我们有一列映射字符串结构的数据有什么想法吗?如何完成这项工作?以上是关于Spark (Scala) - 在 DataFrame 中恢复爆炸的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark/Scala 中使用 countDistinct?