如何映射具有相当复杂模式的数据集?
Posted
技术标签:
【中文标题】如何映射具有相当复杂模式的数据集?【英文标题】:How to map over a Dataset with fairly complex schema? 【发布时间】:2017-09-26 07:51:02 【问题描述】:我正在使用具有类似于此的复杂架构的 Dataframe:
root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- usersDetails: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- name: string (nullable = true)
| | | |-- contacts: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- phone: string (nullable = true)
| | | | | |-- email: string (nullable = true)
| | | | | |-- address: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- code: string (nullable = true)
| | |-- date: string (nullable = true)
我想对DataFrame的每一行应用一个自定义函数来执行映射以满足要求:
数据框的每一行都有 2 个或更多元素,这些元素具有我在问题中发布的结构。首先,我想将每行的这些元素分离到行列表中,因为我需要比较它们。一个我有一个 DataFrame[List[Row]] 我想应用另一个地图,所以我可以合并每个列表的元素(为此我有一个递归函数,我写了一个检查列表中的顺序并填充新的空字段具有较旧值的元素)。在我使用 RDD 做所有这些之前,但我正在尝试用 DataFrame API 做同样的事情
我认为为此我需要传递一个编码器。
由于架构相当复杂(至少我不知道如何在存在数组时生成 StructType 元素也是数组)我尝试通过传递架构来生成编码器,执行如下操作:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
val sourceSchema = dfSoruce.schema
val encoder = RowEncoder(sourceSchema)
dfSoruce.map(x => x.getList[Row](0))(encoder)
但我收到以下错误:
类型不匹配;成立 : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] 必需的: org.apache.spark.sql.Encoder[java.util.List[org.apache.spark.sql.Row]]
如何从 ExpressionEncoder 转换为 Encoder?
【问题讨论】:
【参考方案1】:我想对 DataFrame 的每一行应用一个自定义函数来执行映射,但为此我需要传递一个编码器。
让我不同意。
map 运算符(应避免)
引用 scaladoc 的 map
运算符:
map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 返回一个新的 Dataset,其中包含将 func 应用于每个元素。
您可能已经注意到编码器(在第二个参数列表中)是一个隐式参数,因此不必显式提供(这就是 Scala 中隐式的美妙之处,不是吗? em>)
我的建议是使用func
将您的转换为可编码类型U
,即您可以在数据集中使用的任何类型。您可以在 Encoders 对象中找到将类型转换为其可编码变体的可用编码器。
scala> :type ids
org.apache.spark.sql.Dataset[Long]
scala> ids.map(id => (id, "hello" * id.toInt)).show(truncate = false)
+---+---------------------------------------------+
|_1 |_2 |
+---+---------------------------------------------+
|0 | |
|1 |hello |
|2 |hellohello |
|3 |hellohellohello |
|4 |hellohellohellohello |
|5 |hellohellohellohellohello |
|6 |hellohellohellohellohellohello |
|7 |hellohellohellohellohellohellohello |
|8 |hellohellohellohellohellohellohellohello |
|9 |hellohellohellohellohellohellohellohellohello|
+---+---------------------------------------------+
但我宁愿在withColumn
和标准功能不足之后才将map
用于更高级的转换。
(推荐)withColumn 运算符和标准函数
我宁愿将withColumn
运算符与functions 对象中的标准函数一起使用,它会给你map
类似的行为。
让我们检查一下您的要求,看看我们采用这种方法能走多远。
首先我想在行列表中分离每一行的元素
对我来说,行列表听起来像 groupBy
聚合,然后是 collect_list
函数(可能使用一些 withColumn
运算符来提取所需的值)。
// leave you to fill the gaps
dfSoruce.withColumn(...).groupBy(...).agg(collect_list(...))
您不必过多考虑编码器(因为它们是 Spark SQL 中相当低级和相当高级的概念)
【讨论】:
早上好,我试图尽我所能遵循您的建议,但我无法应用标准功能来实现我所需要的。所以我需要回到地图,因为我传递的是一个 Seq[Row] 到地图内的函数,它在运行时要求一个明确的编码器。但我面临着正确生成编码器的问题。我在这里发布了一个关于这个问题的新问题,我留下链接以防你有空闲时间检查:***.com/q/46525530/1773841Thanks以上是关于如何映射具有相当复杂模式的数据集?的主要内容,如果未能解决你的问题,请参考以下文章