Spark数据框爆炸功能

Posted

技术标签:

【中文标题】Spark数据框爆炸功能【英文标题】:Spark dataframe explode function 【发布时间】:2016-08-24 06:13:07 【问题描述】:

谁能解释一下为什么Row,Seq[Row] 会在包含元素集合的dataframe 字段爆炸后使用。 您能否解释一下为什么需要 asInstanceOf 才能从爆炸字段中获取值?

语法如下:

val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees"))      
                          case Row(employee: Seq[Row]) => 
                          employee.map(employee =>
                          Employee(employee(0).asInstanceOf[String], 
                          employee(1).asInstanceOf[String], employee(2).asInstanceOf[String]) ) 

【问题讨论】:

【参考方案1】:

首先我要注意,我无法解释为什么您的 explode() 会变成 Row(employee: Seq[Row]),因为我不知道您的 DataFrame 的架构。我不得不假设它与您的数据结构有关。

不知道你的原始数据,我创建了一个小数据集来工作

scala> val df = sc.parallelize( Array( (1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]

如果我现在映射它,您可以设置它返回包含 Any 类型数据的行。

scala> df.map case row: Row => (row(0), row(1)) 
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33

你基本上已经丢失了类型信息,这就是为什么你要使用行中的数据时需要显式指定类型

scala> df.map case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String]) 
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33

所以,为了引爆它,我必须做到以下几点

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.Row
df.explode(col("id"), col("text")) case row: Row =>
    val id = row(0).asInstanceOf[Int]
    val words = row(1).asInstanceOf[String].split(" ")
    words.map(word => (id, word))


// Exiting paste mode, now interpreting.

import org.apache.spark.sql.Row
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string]

scala> res30 show
+---+--------------------+---+-----+
| id|                text| _1|   _2|
+---+--------------------+---+-----+
|  1|dsfds dsf dasf ds...|  1|dsfds|
|  1|dsfds dsf dasf ds...|  1|  dsf|
|  1|dsfds dsf dasf ds...|  1| dasf|
|  1|dsfds dsf dasf ds...|  1|  dsf|
|  1|dsfds dsf dasf ds...|  1|  dsf|
|  1|dsfds dsf dasf ds...|  1|    d|
|  2|2344 2353 24 2343...|  2| 2344|
|  2|2344 2353 24 2343...|  2| 2353|
|  2|2344 2353 24 2343...|  2|   24|
|  2|2344 2353 24 2343...|  2|23432|
|  2|2344 2353 24 2343...|  2|  234|
+---+--------------------+---+-----+

如果你想要命名列,你可以定义一个案例类来保存你的分解数据

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.Row
case class ExplodedData(word: String)
df.explode(col("id"), col("text")) case row: Row =>
    val words = row(1).asInstanceOf[String].split(" ")
    words.map(word => ExplodedData(word))


// Exiting paste mode, now interpreting.

import org.apache.spark.sql.Row
defined class ExplodedData
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string]

scala> res35.select("id","word").show
+---+-----+
| id| word|
+---+-----+
|  1|dsfds|
|  1|  dsf|
|  1| dasf|
|  1|  dsf|
|  1|  dsf|
|  1|    d|
|  2| 2344|
|  2| 2353|
|  2|   24|
|  2|23432|
|  2|  234|
+---+-----+

希望这会带来一些清晰。

【讨论】:

【参考方案2】:

我认为您可以先阅读有关该文档的更多信息并进行测试。

数据帧的爆炸仍然返回一个数据帧。它接受一个 lambda 函数 f: (Row) ⇒ TraversableOnce[A] 作为参数。

在 lambda 函数中,您将按大小写匹配输入。你已经知道你的 input 会是 Row of employee,它仍然是一个 Seq of Row。所以 input 的 case 会 Row(employee: Seq[Row]) ,如果你不明白这部分,你可以学习更多关于 scala 中取消应用功能的信息。

然后,员工(我相信你应该在这里使用员工)作为行的序列,将应用映射函数将每一行映射到一个员工。您将使用 scala 应用函数来获取该行中的第 i 个值。但是返回值是一个 Object ,所以你必须使用 asInstanceOf 将它转换成你期望的类型。

【讨论】:

以上是关于Spark数据框爆炸功能的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 数据框中爆炸嵌套结构

Apache Spark 数据框列爆炸为多列

Spark 1.6以空值爆炸[重复]

火花数据框爆炸功能错误

横向视图/在 Spark 中用多列爆炸,得到重复

如何使用 Scala 在 Spark 中爆炸嵌套结构