Spark使用类将rdd转换为数据框

Posted

技术标签:

【中文标题】Spark使用类将rdd转换为数据框【英文标题】:Spark convert rdd to dataframe using a class 【发布时间】:2017-03-31 10:23:56 【问题描述】:

我正在使用带有 Scala 的 Spark 1.6。 我在看here 但我没有找到明确的答案 我有一个大文件,在过滤包含一些版权的第一行后,我想获取标题(104 个字段)并将其转换为 StructType 模式。 我正在考虑使用一个类扩展Product trait 来定义Dataframe 的架构,然后根据该架构将其转换为Dataframe

最好的方法是什么。

这是我文件中的一个示例:

   text  (06.07.03.216)  COPYRIGHT © skdjh 2000-2016
    text  160614_54554.vf Database    53643_csc   Interface   574 zn  65
    Start   Date    14/06/2016  00:00:00:000
    End Date    14/06/2016  00:14:59:999
    State   "s23"

        cin. Nb      Start     End        Event       Con. Duration  IMSI
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx

T 想把它转换成 SparkSQL 像这个模式

    ----------------------------------------------------------------------------------------
  |    cin_Nb |  Start            |   End          |      Event   |   Con_Duration  | IMSI  |
  | ----------------------------------------------------------------------------------------|
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   20556800 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx | 
    ----------------------------------------------------------------------------------------

【问题讨论】:

How to convert rdd object to dataframe in spark的可能重复 对不起,但它的不同我正在寻找那里,但我没有得到我的答案 【参考方案1】:

很遗憾,您不能使用案例类或 StructType 模式!原因是 scala 不支持超过 22 个部分的元组,并且这两种方法都在幕后使用了元组。由于您有超过 22 列,因此该方法不起作用。

但是,您仍然可以这样做,只是不太好 :) 您需要将其转换为单列数据框,并将该列称为“原始”之类的有意义的名称

val df = rdd.toDF("raw")

接下来,您需要定义一个函数来为任何给定列提取所需的列:

val extractData(idx: Long) = udf[String, String, Int](raw => ???)

现在,您需要使用此函数附加所需的每一列。

val columns = yourColumnNamesList.zipWithIndex

val df2 = columns.foldLeft(df)case (acc,(cname,cid)) => acc.withColumn(cname, extractData(cid)($"raw")

虽然执行 foldLeft 看起来有点可怕,但如果您查看执行计划器创建的计划,spark 足够聪明,可以将所有这些扁平化为一个映射步骤,并且吞吐量比您预期的要好。

最后,您可以删除原始数据,因为不再需要它。

df2.drop("raw")

或者!

如果您的数据在文件系统上采用分隔格式,您应该查看 DataBricks csv 解析器,它也适用于 1.6 :-)

【讨论】:

【参考方案2】:

您可以使用 zipwithindex 并过滤第一行而不是 你可以使用一个类来检查标题

class convert( var  cin_Nb:String, var start:String, var end:String, 
        var event:String, var duration:String, var zed:String,......)extends Product  with Serializable     
      def canEqual(that: Any) = that.isInstanceOf[convert]    
      def productArity = 104   
      def productElement(idx: Int) = idx match 
                    case 0 => cin_Nb;case 1 =>  start;case 2 =>  end;
                    case 3 =>  event;case 4 =>  duration;case 5 =>  zed;
                    ..........
                    
      

然后您使用此结构将 rdd 转换为数据帧

【讨论】:

你能解释的比你多吗

以上是关于Spark使用类将rdd转换为数据框的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花中将rdd对象转换为数据框

将包含 BigInt 的 RDD 转换为 Spark Dataframe

将地图 RDD 转换为数据框

将rdd转换为数据框时,pyspark对mapPartitions使用一项任务

PySpark:将 RDD 转换为数据框中的列

如何将 RDD [GenericRecord] 转换为 scala 中的数据框?