如何映射具有相当复杂模式的数据集?

Posted

技术标签:

【中文标题】如何映射具有相当复杂模式的数据集?【英文标题】:How to map over a Dataset with fairly complex schema? 【发布时间】:2017-09-26 07:51:02 【问题描述】:

我正在使用具有类似于此的复杂架构的 Dataframe:

 root
 |-- NPAData: struct (nullable = true)
 |    |-- NPADetails: struct (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- manager: string (nullable = true)
 |    |-- usersDetails: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- contacts: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- phone: string (nullable = true)
 |    |    |    |    |    |-- email: string (nullable = true)
 |    |    |    |    |    |-- address: string (nullable = true)
 |    |-- service: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- serviceName: string (nullable = true)
 |    |    |    |-- serviceCode: string (nullable = true) 
 |-- NPAHeader: struct (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- date: string (nullable = true)

我想对DataFrame的每一行应用一个自定义函数来执行映射以满足要求:

数据框的每一行都有 2 个或更多元素,这些元素具有我在问题中发布的结构。首先,我想将每行的这些元素分离到行列表中,因为我需要比较它们。一个我有一个 DataFrame[List[Row]] 我想应用另一个地图,所以我可以合并每个列表的元素(为此我有一个递归函数,我写了一个检查列表中的顺序并填充新的空字段具有较旧值的元素)。在我使用 RDD 做所有这些之前,但我正在尝试用 DataFrame API 做同样的事情

我认为为此我需要传递一个编码器。

由于架构相当复杂(至少我不知道如何在存在数组时生成 StructType 元素也是数组)我尝试通过传递架构来生成编码器,执行如下操作:

import org.apache.spark.sql.catalyst.encoders.RowEncoder

val sourceSchema = dfSoruce.schema 

val encoder = RowEncoder(sourceSchema)

dfSoruce.map(x => x.getList[Row](0))(encoder)

但我收到以下错误:

类型不匹配;成立 : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] 必需的: org.apache.spark.sql.Encoder[java.util.List[org.apache.spark.sql.Row]]

如何从 ExpressionEncoder 转换为 Encoder?

【问题讨论】:

【参考方案1】:

我想对 DataFrame 的每一行应用一个自定义函数来执行映射,但为此我需要传递一个编码器。

让我不同意。

map 运算符(应避免)

引用 scaladoc 的 map 运算符:

ma​​p[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 返回一个新的 Dataset,其中包含将 func 应用于每个元素。

您可能已经注意到编码器(在第二个参数列表中)是一个隐式参数,因此不必显式提供(这就是 Scala 中隐式的美妙之处,不是吗? em>)

我的建议是使用func 将您的转换为可编码类型U,即您可以在数据集中使用的任何类型。您可以在 Encoders 对象中找到将类型转换为其可编码变体的可用编码器。

scala> :type ids
org.apache.spark.sql.Dataset[Long]

scala> ids.map(id => (id, "hello" * id.toInt)).show(truncate = false)
+---+---------------------------------------------+
|_1 |_2                                           |
+---+---------------------------------------------+
|0  |                                             |
|1  |hello                                        |
|2  |hellohello                                   |
|3  |hellohellohello                              |
|4  |hellohellohellohello                         |
|5  |hellohellohellohellohello                    |
|6  |hellohellohellohellohellohello               |
|7  |hellohellohellohellohellohellohello          |
|8  |hellohellohellohellohellohellohellohello     |
|9  |hellohellohellohellohellohellohellohellohello|
+---+---------------------------------------------+

但我宁愿在withColumn 和标准功能不足之后才将map 用于更高级的转换。

(推荐)withColumn 运算符和标准函数

我宁愿将withColumn 运算符与functions 对象中的标准函数一起使用,它会给你map 类似的行为。

让我们检查一下您的要求,看看我们采用这种方法能走多远。

首先我想在行列表中分离每一行的元素

对我来说,行列表听起来像 groupBy 聚合,然后是 collect_list 函数(可能使用一些 withColumn 运算符来提取所需的值)。

// leave you to fill the gaps
dfSoruce.withColumn(...).groupBy(...).agg(collect_list(...))

您不必过多考虑编码器(因为它们是 Spark SQL 中相当低级和相当高级的概念)

【讨论】:

早上好,我试图尽我所能遵循您的建议,但我无法应用标准功能来实现我所需要的。所以我需要回到地图,因为我传递的是一个 Seq[Row] 到地图内的函数,它在运行时要求一个明确的编码器。但我面临着正确生成编码器的问题。我在这里发布了一个关于这个问题的新问题,我留下链接以防你有空闲时间检查:***.com/q/46525530/1773841Thanks

以上是关于如何映射具有相当复杂模式的数据集?的主要内容,如果未能解决你的问题,请参考以下文章

R:具有 2 个大型数据集的模式匹配金融时间序列数据:

关系数据库模式设计 - 如何直接从实体的字段集建模一对一映射

如何使用automapper映射与多个表的数据集

数据库系统概念笔记——第7章:数据库设计和E-R模型

对具有复杂关系的数据集进行重复数据删除

用于 ORM 验证的大型/复杂数据集?