从自定义数据格式创建火花数据框
Posted
技术标签:
【中文标题】从自定义数据格式创建火花数据框【英文标题】:Create spark data frame from custom data format 【发布时间】:2016-09-08 23:24:12 【问题描述】:我有一个文本文件,其中字符串 REC 作为记录分隔符,换行符作为列分隔符,每个数据都附有列名,以逗号作为分隔符,下面是示例数据格式
录制 编号,19048 长期,牛奶 排名,1 录制 编号,19049 术语,玉米 排名,5
使用 REC 作为记录分隔符。现在,我想创建带有列名 ID、Term 和 Rank 的 spark 数据框。请协助我。
【问题讨论】:
【参考方案1】:这是工作代码
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable, Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf, SparkContext
object RecordSeparator extends App
var conf = new
SparkConf().setAppName("test").setMaster("local[1]")
.setExecutorEnv("executor- cores", "2")
var sc = new SparkContext(conf)
val hconf = new Configuration
hconf.set("textinputformat.record.delimiter", "REC")
val data = sc.newAPIHadoopFile("data.txt",
classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
.map(x => getRecord(x)).map(x => x.split(","))
.map(x => record(x(0), x(2), x(2)))
val sqlContext = new SQLContext(sc)
val df = data.toDF()
df.printSchema()
df.show(false)
def getRecord(in: String): String =
val ar = in.split("\n").mkString(",").split(",")
val data = Array(ar(1), ar(3), ar(5))
data.mkString(",")
case class record(Id: String, Term: String, Rank: String)
输出:
root
|-- Id: string (nullable = true)
|-- Term: string (nullable = true)
|-- Rank: string (nullable = true)
+-----+----+----+
|Id |Term|Rank|
+-----+----+----+
|19048|1 |1 |
|19049|5 |5 |
+-----+----+----+
【讨论】:
【参考方案2】:假设您的文件位于“普通”文件系统(不是 HDFS)上,您必须编写一个文件解析器,然后使用 sc.parallelize
创建一个 RDD
,然后创建一个 DataFrame
:
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
import scala.collection.mutable
object Demo extends App
val conf = new SparkConf().setMaster("local[1]").setAppName("Demo")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
case class Record(
var id:Option[Int] = None,
var term:Option[String] = None,
var rank:Option[Int] = None)
val filename = "data.dat"
val records = readFile(filename)
val df = sc.parallelize(records).toDF
df.printSchema()
df.show()
def readFile(filename:String) : Seq[Record] =
import scala.io.Source
val records = mutable.ArrayBuffer.empty[Record]
var currentRecord: Record = null
for (line <- Source.fromFile(filename).getLines)
val tokens = line.split(',')
currentRecord = tokens match
case Array("REC") => Record()
case Array("Id", id) =>
currentRecord.id = Some(id.toInt); currentRecord
case Array("Term", term) =>
currentRecord.term = Some(term); currentRecord
case Array("Rank", rank) =>
currentRecord.rank = Some(rank.toInt); records += currentRecord;
null
return records
这给了
root
|-- id: integer (nullable = true)
|-- term: string (nullable = true)
|-- rank: integer (nullable = true)
+-----+----+----+
| id|term|rank|
+-----+----+----+
|19048|milk| 1|
|19049|corn| 5|
+-----+----+----+
【讨论】:
以上是关于从自定义数据格式创建火花数据框的主要内容,如果未能解决你的问题,请参考以下文章