Spark数据框爆炸功能
Posted
技术标签:
【中文标题】Spark数据框爆炸功能【英文标题】:Spark dataframe explode function 【发布时间】:2016-08-24 06:13:07 【问题描述】:谁能解释一下为什么Row
,Seq[Row]
会在包含元素集合的dataframe
字段爆炸后使用。
您能否解释一下为什么需要 asInstanceOf 才能从爆炸字段中获取值?
语法如下:
val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees"))
case Row(employee: Seq[Row]) =>
employee.map(employee =>
Employee(employee(0).asInstanceOf[String],
employee(1).asInstanceOf[String], employee(2).asInstanceOf[String]) )
【问题讨论】:
【参考方案1】:首先我要注意,我无法解释为什么您的 explode()
会变成 Row(employee: Seq[Row])
,因为我不知道您的 DataFrame 的架构。我不得不假设它与您的数据结构有关。
不知道你的原始数据,我创建了一个小数据集来工作
scala> val df = sc.parallelize( Array( (1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]
如果我现在映射它,您可以设置它返回包含 Any 类型数据的行。
scala> df.map case row: Row => (row(0), row(1))
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33
你基本上已经丢失了类型信息,这就是为什么你要使用行中的数据时需要显式指定类型
scala> df.map case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String])
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33
所以,为了引爆它,我必须做到以下几点
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
df.explode(col("id"), col("text")) case row: Row =>
val id = row(0).asInstanceOf[Int]
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => (id, word))
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string]
scala> res30 show
+---+--------------------+---+-----+
| id| text| _1| _2|
+---+--------------------+---+-----+
| 1|dsfds dsf dasf ds...| 1|dsfds|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dasf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| d|
| 2|2344 2353 24 2343...| 2| 2344|
| 2|2344 2353 24 2343...| 2| 2353|
| 2|2344 2353 24 2343...| 2| 24|
| 2|2344 2353 24 2343...| 2|23432|
| 2|2344 2353 24 2343...| 2| 234|
+---+--------------------+---+-----+
如果你想要命名列,你可以定义一个案例类来保存你的分解数据
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
case class ExplodedData(word: String)
df.explode(col("id"), col("text")) case row: Row =>
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => ExplodedData(word))
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
defined class ExplodedData
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string]
scala> res35.select("id","word").show
+---+-----+
| id| word|
+---+-----+
| 1|dsfds|
| 1| dsf|
| 1| dasf|
| 1| dsf|
| 1| dsf|
| 1| d|
| 2| 2344|
| 2| 2353|
| 2| 24|
| 2|23432|
| 2| 234|
+---+-----+
希望这会带来一些清晰。
【讨论】:
【参考方案2】:我认为您可以先阅读有关该文档的更多信息并进行测试。
数据帧的爆炸仍然返回一个数据帧。它接受一个 lambda 函数 f: (Row) ⇒ TraversableOnce[A] 作为参数。
在 lambda 函数中,您将按大小写匹配输入。你已经知道你的 input 会是 Row of employee,它仍然是一个 Seq of Row。所以 input 的 case 会 Row(employee: Seq[Row]) ,如果你不明白这部分,你可以学习更多关于 scala 中取消应用功能的信息。
然后,员工(我相信你应该在这里使用员工)作为行的序列,将应用映射函数将每一行映射到一个员工。您将使用 scala 应用函数来获取该行中的第 i 个值。但是返回值是一个 Object ,所以你必须使用 asInstanceOf 将它转换成你期望的类型。
【讨论】:
以上是关于Spark数据框爆炸功能的主要内容,如果未能解决你的问题,请参考以下文章