从自定义数据格式创建火花数据框

Posted

技术标签:

【中文标题】从自定义数据格式创建火花数据框【英文标题】:Create spark data frame from custom data format 【发布时间】:2016-09-08 23:24:12 【问题描述】:

我有一个文本文件,其中字符串 REC 作为记录分隔符,换行符作为列分隔符,每个数据都附有列名,以逗号作为分隔符,下面是示例数据格式

录制 编号,19048 长期,牛奶 排名,1 录制 编号,19049 术语,玉米 排名,5

使用 REC 作为记录分隔符。现在,我想创建带有列名 ID、Term 和 Rank 的 spark 数据框。请协助我。

【问题讨论】:

【参考方案1】:

这是工作代码

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable, Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf, SparkContext


object RecordSeparator extends App 
  var conf = new
      SparkConf().setAppName("test").setMaster("local[1]")
    .setExecutorEnv("executor- cores", "2")
  var sc = new SparkContext(conf)
  val hconf = new Configuration
  hconf.set("textinputformat.record.delimiter", "REC")
  val data = sc.newAPIHadoopFile("data.txt",
    classOf[TextInputFormat], classOf[LongWritable],
    classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
    .map(x => getRecord(x)).map(x => x.split(","))
    .map(x => record(x(0), x(2), x(2)))

  val sqlContext = new SQLContext(sc)
  val df = data.toDF()
  df.printSchema()
  df.show(false)

  def getRecord(in: String): String = 
    val ar = in.split("\n").mkString(",").split(",")
    val data = Array(ar(1), ar(3), ar(5))
    data.mkString(",")
  


case class record(Id: String, Term: String, Rank: String)

输出:

 root
 |-- Id: string (nullable = true)
 |-- Term: string (nullable = true)
 |-- Rank: string (nullable = true)

+-----+----+----+
|Id   |Term|Rank|
+-----+----+----+
|19048|1   |1   |
|19049|5   |5   |
+-----+----+----+

【讨论】:

【参考方案2】:

假设您的文件位于“普通”文件系统(不是 HDFS)上,您必须编写一个文件解析器,然后使用 sc.parallelize 创建一个 RDD,然后创建一个 DataFrame

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
import scala.collection.mutable

object Demo extends App 
  val conf = new SparkConf().setMaster("local[1]").setAppName("Demo")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._


  case class Record(
                     var id:Option[Int] = None,
                     var term:Option[String] = None,
                     var rank:Option[Int] = None)



  val filename = "data.dat"

  val records = readFile(filename)
  val df = sc.parallelize(records).toDF
  df.printSchema()
  df.show()



  def readFile(filename:String) : Seq[Record] = 
    import scala.io.Source

    val records = mutable.ArrayBuffer.empty[Record]
    var currentRecord: Record = null

    for (line <- Source.fromFile(filename).getLines) 
      val tokens = line.split(',')

      currentRecord = tokens match 
        case Array("REC") => Record()
        case Array("Id", id) => 
          currentRecord.id = Some(id.toInt); currentRecord
        
        case Array("Term", term) => 
          currentRecord.term = Some(term); currentRecord
        
        case Array("Rank", rank) => 
          currentRecord.rank = Some(rank.toInt); records += currentRecord;
          null
        
      
    
    return records
  

这给了

root
 |-- id: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- rank: integer (nullable = true)

+-----+----+----+
|   id|term|rank|
+-----+----+----+
|19048|milk|   1|
|19049|corn|   5|
+-----+----+----+

【讨论】:

以上是关于从自定义数据格式创建火花数据框的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:从列表的 RDD 创建一个火花数据框,其中列表的某些元素是对象

从用户定义的函数创建火花数据框列

php 从自定义日期格式创建日期对象

根据scala中的条件对列进行火花数据框聚合

如何将 JSON 格式的数据展平为 spark 数据框

如何将火花数据输出到具有单独列的 csv 文件?