在火花中压缩 2 列 [重复]

Posted

技术标签:

【中文标题】在火花中压缩 2 列 [重复]【英文标题】:Zip 2 columns in spark [duplicate] 【发布时间】:2018-06-15 08:27:37 【问题描述】:

数据框结构:

 |     main_id|                  id|           createdBy|
 +------------+--------------------+--------------------+
 |1           |          [10,20,30]|        [999,888,777|
 |2           |                [30]|               [666]|

预期的数据框结构:

|     main_id|                  id|           createdBy|
+------------+--------------------+--------------------+
|1                           10                    999
|1                           20                    888
|1                           30                    777
|2           |               30|                   666

Code_1 已尝试:

 df.select($"main_id",explode($"id"),$"createdBy").select($"main_id",$"id",explode($"createdBy"))

这也会导致错误的配对和重复。关于我应该调整什么以获得所需输出的任何建议。

我还尝试在第一个引发错误的选择语句中使用多个爆炸。

Code_2 已尝试:

import org.apache.spark.sql.functions.udf, explode
val zip = udf((xs: Seq[String], ys: Seq[String]) => xs.zip(ys))

df.withColumn("vars", explode(zip($"id", $"createdBy"))).select(
$"main_id",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show(1)

警告和错误:

warning: there was one deprecation warning; re-run with -deprecation for details
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 564.0 failed 4 times, most recent failure: Lost task 0.3 in 
stage 564.0 (TID 11570, ma4-csxp-ldn1015.corp.apple.com, executor 288)

是的,我问了同样的问题,该问题作为重复指向另一个解决方案而关闭,这是我在 sn-p 2 中尝试过的。它也没有工作。任何建议都会很有帮助。

【问题讨论】:

根据您的 cmets,您似乎遇到了一些与问题中提供的代码无关的依赖问题。您可以启动here,如果它不能解决您的问题,请提供minimal reproducible example(最小代码、依赖版本、提交方法、集群管理器)的另一个问题 【参考方案1】:

也许以下内容会有所帮助:

val x = someDF.withColumn("createdByExploded", explode(someDF("createdBy"))).select("createdByExploded", "main_id")
val y = someDF.withColumn("idExploded", explode(someDF("id"))).select("idExploded", "main_id")

val xInd = x.withColumn("index", monotonically_increasing_id)
val yInd = y.withColumn("index", monotonically_increasing_id)

val joined = xInd.join(yInd, xInd("index") === yInd("index"), "outer").drop("index")

https://forums.databricks.com/questions/8180/how-to-merge-two-data-frames-column-wise-in-apache.html

【讨论】:

以上是关于在火花中压缩 2 列 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

在火花中读取 json [重复]

火花数据框删除重复并保留第一

从火花数据帧中读取结构[重复]

根据日期范围过滤火花数据框[重复]

在火花数据框中使用案例类的好处[重复]

如何在火花中将数据帧转换为csv [重复]