Spark2加载保存文件,数据文件转换成数据框dataframe

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark2加载保存文件,数据文件转换成数据框dataframe相关的知识,希望对你有一定的参考价值。

hadoop fs -put /home/wangxiao/data/ml/Affairs.csv /datafile/wangxiao/

hadoop fs -ls -R /datafile

drwxr-xr-x   - wangxiao supergroup          0 2016-10-15 10:46 /datafile/wangxiao

-rw-r--r--   3 wangxiao supergroup      16755 2016-10-15 10:46 /datafile/wangxiao/Affairs.csv

-rw-r--r--   3 wangxiao supergroup      16755 2016-10-13 21:48 /datafile/wangxiao/Affairs.txt

 

// affairs:一年来婚外情的频率

// gender:性别

// age:年龄

// yearsmarried:婚龄

// children:是否有小孩

// religiousness:宗教信仰程度(5分制,1分表示反对,5分表示非常信仰)

// education:学历

// occupation:职业(逆向编号的戈登7种分类)

// rating:对婚姻的自我评分(5分制,1表示非常不幸福,5表示非常幸福)

0,male,37,10,no,3,18,7,4
0,female,27,4,no,4,14,6,4
0,female,32,15,yes,1,12,1,4
0,male,57,15,yes,5,18,6,5
0,male,22,0.75,no,2,17,6,3
0,female,32,1.5,no,2,17,5,5
0,female,22,0.75,no,2,12,1,3
0,male,57,15,yes,2,14,4,4
0,female,32,15,yes,4,16,1,2
0,male,22,1.5,no,4,14,4,5
0,male,37,15,yes,2,20,7,2
0,male,27,4,yes,4,18,6,4
0,male,47,15,yes,5,17,6,4
0,female,22,1.5,no,2,17,5,4

 

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame

import org.apache.spark.rdd.RDD

 

object ML1 {   def main(args: Array[String]) {      

  val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames    

  import spark.implicits._        

  // 创建数据框    

  // val data1:DataFrame=spark.read.csv("hdfs://ns1/datafile/wangxiao/Affairs.csv")        

  val data2:DataFrame=spark.read.format("csv").load("hdfs://ns1/datafile/wangxiao/Affairs.csv")        

  val df=data2.toDF("affairs","gender","age","yearsmarried","children","religiousness","education","occupation","rating")   

   df.printSchema()        

  // 指定字段名和字段类型    

  /*case class Affairs(affairs: Int, gender: String, age: Int,                       

            yearsmarried: Double, children: String, religiousness: Int,                       

            education: Double, occupation: Double, rating: Int)

    val res = data2.map { r =>      

            Affairs(r(0).toString().toInt, r(0).toString(), r(0).toString().toInt,        

               r(0).toString().toDouble, r(0).toString(), r(0).toString().toInt,        

               r(0).toString().toDouble, r(0).toString().toDouble, r(0).toString().toInt)    

  }    

  res.printSchema()*/

     

  /* 创建RDD     

  * val data:RDD[String]=spark.sparkContext.textFile("hdfs://ns1/datafile/wangxiao/Affairs.txt")         

  case class Affairs(affairs:Int, gender:String, age:Int,        

           yearsmarried:Double, children:String, religiousness:Int,        

           education:Double, occupation:Double, rating:Int)        

  // RDD转换成数据框    

  val df=data.map{_.split(" ")}.map { line =>       Affairs( line(0).toInt,line(1).trim.toString(),line(2).toInt,          

                               line(3).toDouble,line(4).trim.toString(),line(5).toInt,          

                               line(6).toDouble,line(7).toDouble,line(8).toInt)       

          }.toDF()     */ 

       

  // 创建视图     df.createOrReplaceTempView("Affairs")  

      

  // 子查询    

  //val df1 = spark.sql("SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25")    

  val df1 = spark.sql("select gender, age,rating from  ( SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25 ) t ")        

  df1.show  

  // 保存数据框到文件

  df.select("gender", "age","education").write.format("csv").save("hdfs://ns1/datafile/wangxiao/data123.csv")

} }

 

hadoop fs -ls -R /datafile
drwxr-xr-x   - wangxiao supergroup          0 2016-10-15 11:43 /datafile/wangxiao
-rw-r--r--   3 wangxiao supergroup      16755 2016-10-15 10:46 /datafile/wangxiao/Affairs.csv
-rw-r--r--   3 wangxiao supergroup      16755 2016-10-13 21:48 /datafile/wangxiao/Affairs.txt
drwxr-xr-x   - wangxiao supergroup          0 2016-10-15 11:43 /datafile/wangxiao/data123.csv

以上是关于Spark2加载保存文件,数据文件转换成数据框dataframe的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中对嵌套的 Dataframe 进行平面映射

将数据框保存到本地文件系统会导致空结果

pyspark 使用模式将 csv 文件加载到数据框中

PySpark:Spark数据框-将ImageSchema列转换为nDArray作为新列

将文件列表 (JSON) 转换为数据框

AS3 将多个文本框数据保存并加载到本地文件