spark DataFrame 读写和保存数据

Posted dhname

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark DataFrame 读写和保存数据相关的知识,希望对你有一定的参考价值。

一。读写Parquet(DataFrame) 

  Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源。前面的介绍中,我们已经涉及到了JSON、文本格式的加载,这里不再赘述。这里介绍Parquet,下一节会介绍JDBC数据库连接。

  Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:
    * 查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
    * 计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
    * 数据模型: Avro, Thrift, Protocol Buffers, POJOs
  Spark已经为我们提供了parquet样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,有个users.parquet文件,这个文件格式比较特殊,如果你用vim编辑器打开,或者用cat命令查看文件内容,肉眼是一堆乱七八糟的东西,是无法理解的。只有被加载到程序中以后,Spark会对这种格式进行解析,然后我们才能理解其中的数据。
  下面代码演示了如何从parquet文件中加载数据生成DataFrame。

>>> parquetFileDF = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet"
>>> parquetFileDF.createOrReplaceTempView("parquetFile")
 
>>> namesDF = spark.sql("SELECT * FROM parquetFile")
 
>>> namesDF.rdd.foreach(lambda person: print(person.name))
 
Alyssa
Ben
 

 

>>> peopleDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
 
>>> peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")
 

 

二。jdbc

  

>>> jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
>>> jdbcDF.show()

 

>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
//下面要设置模式信息
>>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
>>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
>>> studentDF = spark.createDataFrame(rowRDD, schema)
>>> prop = {}
>>> prop[user] = root
>>> prop[password] = hadoop
>>> prop[driver] = "com.mysql.jdbc.Driver"
>>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",student,append, prop)

 

以上是关于spark DataFrame 读写和保存数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL数据载入和保存实战

Spark SQL数据加载和保存实战

Spark SQL数据加载和保存实战

编程实现利用 DataFrame 读写 MySQL 的数据

Spark SQL DataFrame - 异常处理

Spark SQL读写方法