Spark 连接数据框和数据集
Posted
技术标签:
【中文标题】Spark 连接数据框和数据集【英文标题】:Spark join dataframes & datasets 【发布时间】:2017-06-01 13:38:34 【问题描述】:我有一个名为 Link 的 DataFrame
,在 Row
中有动态数量的字段/列。
然而,一些字段的结构 [ClassName]Id 包含一个 id
[ClassName]Id 的类型始终为 String
我有几个Datasets
,每个都有不同的类型[ClassName]
每个Dataset
至少有id
(String
)和typeName
(String
)这两个字段,总是用[ClassName]
的String值填充
例如如果我有 3 个 A、B 和 C
类型的DataSets
链接:
+----+-----+-----+-----+
| id | AId | BId | CId |
+----+-----+-----+-----+
| XX | A01 | B02 | C04 |
| XY | null| B05 | C07 |
答:
+-----+----------+-----+-----+
| id | typeName | ... | ... |
+-----+----------+-----+-----+
| A01 | A | ... | ... |
乙:
+-----+----------+-----+-----+
| id | typeName | ... | ... |
+-----+----------+-----+-----+
| B02 | B | ... | ... |
首选的最终结果是 Link Dataframe
,其中每个 Id 都被一个名为 [ClassName]
的字段替换或附加,并封装了原始对象。
结果:
+----+----------------+----------------+----------------+
| id | A | B | C |
+----+----------------+----------------+----------------+
| XX | A(A01, A, ...) | B(B02, B, ...) | C(C04, C, ...) |
| XY | null | B(B05, B, ...) | C(C07, C, ...) |
我尝试过的事情
对 joinWith 的递归调用。 第一次调用成功返回一个元组/Row
,其中第一个元素是原始的Row
,第二个是匹配的[ClassName]
然而,第二次迭代开始嵌套这些结果。
尝试使用 map 'unnest' 这些结果要么导致编码器地狱(因为生成的 Row
不是固定类型),要么编码非常复杂以至于它导致催化剂 error
以 RDD 身份加入还不能解决这个问题。
欢迎提出任何想法。
【问题讨论】:
我不确定我是否明白你的问题。您是否愿意对其进行审查,以便我们尝试提供帮助? 【参考方案1】:所以我想出了如何做我想做的事。 我做了一些改变让它为我工作,但它是一个 出于参考目的,我将展示我的步骤,也许它对将来的某人有用?
-
首先我声明一个数据类型,它共享我感兴趣的 A、B、C 等的所有属性,并使类从这个超类型扩展
case class Base(id: String, typeName: String)
case class A(override val id: String, override val typeName: String) extends Base(id, typeName)
-
接下来我加载链接
Dataframe
val linkDataFrame = spark.read.parquet("[path]")
-
我想将这个
DataFrame
转换为可连接的东西,这意味着为连接的源创建一个占位符,以及一种将所有单个Id
字段(AId、BId 等)转换为源的Map
的方法-> 身份证。 Spark 有一个有用的sql map
方法。我们还需要将Base
类转换为StructType
以在编码器中使用。尝试了多种方法,但无法绕过特定声明(否则会出现转换错误)
val linkDataFrame = spark.read.parquet("[path]")
case class LinkReformatted(ids: Map[String, Long], sources: Map[String, Base])
// Maps each column ending with Id into a Map of (columnname1 (-Id), value1, columnname2 (-Id), value2)
val mapper = linkDataFrame.columns.toList
.filter(
_.matches("(?i).*Id$")
)
.flatMap(
c => List(lit(c.replaceAll("(?i)Id$", "")), col(c))
)
val baseStructType = ScalaReflection.schemaFor[Base].dataType.asInstanceOf[StructType]
-
所有这些部分使创建一个新的
DataFrame
成为可能@
val linkDatasetReformatted = linkDataFrame.select(
map(mapper: _*).alias("ids")
)
.withColumn("sources", lit(null).cast(MapType(StringType, baseStructType)))
.as[LinkReformatted]
-
下一步是将所有源
Datasets
(A、B 等)加入这个重新格式化的 Link 数据集。这种尾递归方法发生了很多事情
@tailrec
def recursiveJoinBases(sourceDataset: Dataset[LinkReformatted], datasets: List[Dataset[Base]]): Dataset[LinkReformatted] = datasets match
case Nil => sourceDataset // Nothing left to join, return it
case baseDataset :: remainingDatasets =>
val typeName = baseDataset.head.typeName // extract the type from base (each field hase same value)
val masterName = "source" // something to name the source
val joinedDataset = sourceDataset.as(masterName) // joining source
.joinWith(
baseDataset.as(typeName), // with a base A,B, etc
col(s"$typeName.id") === col(s"$masterName.ids.$typeName"), // join on source.ids.[typeName]
"left_outer"
)
.map
case (source, base) =>
val newSources = if (source.sources == null) Map(typeName -> base) else source.sources + (typeName -> base) // append or create map of sources
source.copy(sources = newSources)
.as[LinkReformatted]
recursiveJoinBases(joinedDataset, remainingDatasets)
-
您现在得到了
Dataset
的 LinkReformatted
记录,其中 ids 字段中每个对应的 typeName -> id
是 来源 中对应的 typeName -> Base
> 场。
对我来说,这就足够了。我可以在这个最终的数据集上使用一些地图函数来提取我需要的一切
我希望这会有所帮助。我知道这不是我要问的确切解决方案,也不是很简单。
【讨论】:
以上是关于Spark 连接数据框和数据集的主要内容,如果未能解决你的问题,请参考以下文章
Spark 数据集 Joinwith 错误:连接条件丢失或不重要