Spark使用类将rdd转换为数据框
Posted
技术标签:
【中文标题】Spark使用类将rdd转换为数据框【英文标题】:Spark convert rdd to dataframe using a class 【发布时间】:2017-03-31 10:23:56 【问题描述】:我正在使用带有 Scala 的 Spark 1.6。
我在看here 但我没有找到明确的答案
我有一个大文件,在过滤包含一些版权的第一行后,我想获取标题(104 个字段)并将其转换为 StructType
模式。
我正在考虑使用一个类扩展Product
trait 来定义Dataframe
的架构,然后根据该架构将其转换为Dataframe
。
最好的方法是什么。
这是我文件中的一个示例:
text (06.07.03.216) COPYRIGHT © skdjh 2000-2016
text 160614_54554.vf Database 53643_csc Interface 574 zn 65
Start Date 14/06/2016 00:00:00:000
End Date 14/06/2016 00:14:59:999
State "s23"
cin. Nb Start End Event Con. Duration IMSI
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
32055680 16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
T 想把它转换成 SparkSQL 像这个模式
----------------------------------------------------------------------------------------
| cin_Nb | Start | End | Event | Con_Duration | IMSI |
| ----------------------------------------------------------------------------------------|
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 20556800 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
| 32055680 | 16/09/2010 | 16:59:59:245 | 16/09/2016 | 17:00:00:000 | xxxxx |
----------------------------------------------------------------------------------------
【问题讨论】:
How to convert rdd object to dataframe in spark的可能重复 对不起,但它的不同我正在寻找那里,但我没有得到我的答案 【参考方案1】:很遗憾,您不能使用案例类或 StructType 模式!原因是 scala 不支持超过 22 个部分的元组,并且这两种方法都在幕后使用了元组。由于您有超过 22 列,因此该方法不起作用。
但是,您仍然可以这样做,只是不太好 :) 您需要将其转换为单列数据框,并将该列称为“原始”之类的有意义的名称
val df = rdd.toDF("raw")
接下来,您需要定义一个函数来为任何给定列提取所需的列:
val extractData(idx: Long) = udf[String, String, Int](raw => ???)
现在,您需要使用此函数附加所需的每一列。
val columns = yourColumnNamesList.zipWithIndex
val df2 = columns.foldLeft(df)case (acc,(cname,cid)) => acc.withColumn(cname, extractData(cid)($"raw")
虽然执行 foldLeft 看起来有点可怕,但如果您查看执行计划器创建的计划,spark 足够聪明,可以将所有这些扁平化为一个映射步骤,并且吞吐量比您预期的要好。
最后,您可以删除原始数据,因为不再需要它。
df2.drop("raw")
或者!
如果您的数据在文件系统上采用分隔格式,您应该查看 DataBricks csv 解析器,它也适用于 1.6 :-)
【讨论】:
【参考方案2】:您可以使用 zipwithindex 并过滤第一行而不是 你可以使用一个类来检查标题
class convert( var cin_Nb:String, var start:String, var end:String,
var event:String, var duration:String, var zed:String,......)extends Product with Serializable
def canEqual(that: Any) = that.isInstanceOf[convert]
def productArity = 104
def productElement(idx: Int) = idx match
case 0 => cin_Nb;case 1 => start;case 2 => end;
case 3 => event;case 4 => duration;case 5 => zed;
..........
然后您使用此结构将 rdd 转换为数据帧
【讨论】:
你能解释的比你多吗以上是关于Spark使用类将rdd转换为数据框的主要内容,如果未能解决你的问题,请参考以下文章
将包含 BigInt 的 RDD 转换为 Spark Dataframe