Spark学习记录:Spark SQL编程

Posted loar_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark学习记录:Spark SQL编程相关的知识,希望对你有一定的参考价值。

一、将RDD转换得到DataFrame

RDD=>DataFrame,有三种情况两种方法,分别通过SparkSession对象的createDataFrame方法和RDD的toDF方法,转换生成DataFrame。

第一种情况,已知RDD的数据结构,采用createDataFrame()方法生成DataFrame。

#假设导入数据集为Data.txt,内容如(Mark,30)
#导入相关库
from pyspark.sql import Row
from pyspark import SparkContext,SparkConf
sc = spark.sparkContext
#构建rdd
rdd = sc.textFile("Data.txt")\\
.map(lambda x: x.split(","))\\
.map(lambda x: Row(name=x[0],age=int(x[1])))	#生成Row对象,相当于赋予了rdd数据结构
#将rdd转换成DataFrame
df = spark.createDataFrame(rdd)

第二种情况,不知RDD的数据结构,采用createDataFrame()方法生成DataFrame。

#生成表头
schemastring = "name age"
fileds = [StructFiled(filed_name,StringType(),True) for filed_name in schemaString.split(" ")]  #用for in遍历生成列名
schema = StructType(fileds)
#构建rdd
rdd = sc.textFile("Data.txt")\\
.map(lambda x: x.split(","))\\
.map(lambda x:Row(x[0],x[1].strip()))  #生成Row对象
df = spark.createDataFrame(rdd,schema)

有表头信息的直接调用生成,没表头信息的创建表头后,再将表头与数据组合。

第三种情况,也是第二种方法,用RDD的toDF方法转换成DataFrame。

rdd = sc.textFile("file:///opt/sparkcluster/mycode/stuScore.txt")\\
.map(lambda x:x.split(','))\\
.map(lambda x:Row(int(x[0]),x[1],int(x[2])))
schema = StructType()\\
.add("id",IntegerType(),nullable=False)\\
.add("subject",StringType(),nullable=True)\\
.add("score",IntegerType(),nullable=False)  #定义列信息
#方法一
df = rdd.toDF(['id','subject','score'])  #只上传列名,不对参数进行设置
df.printSchema()
df.show()
#方法二
df = rdd.toDF(schema)	#上传完整的schema信息。
df.printSchema()
df.show()

二、读写mysql数据库中的数据。
读取数据库数据,通过spark.read.format(“jdbc”)

jdbcDF = spark.read\\
.format("jdbc")\\		#实现对数据库的读取
.option()\\				#设置相关的连接参数。
.load()
#url,数据库连接地址
#driver,数据库的JDBC驱动程序
#dbtable,访问的表名
#user,用户名
#password,用户密码
jdbcDF.show()			#jdbcDF是一个DataFrame,用show()进行展示

写入数据到数据库,通过df.write.jdbc(url,dbtable,追加方式,prop)写入。
写入流程:
将要写入到数据库的数据构建成RDD,对RDD进行相关的处理后,建立Row对象,转换成DataFrame,设置相关数据库参数,通过df.write.jdbc(),将DataFrame写入到数据库中。
实验例子:

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

#下面设置模式信息
schema = StructType([StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("gender", StringType(), True),StructField("age", IntegerType(), True)])
#下面设置两条数据,表示两个学生的信息
studentRDD = spark.sparkContext.parallelize(["3 wangwu F 22","4 zhaoliu M 25"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
 
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
 
#写入数据库
prop = 
prop['user'] = 'root'
prop['password'] = 'root'
prop['driver'] = "com.mysql.cj.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

个人的学习记录,仅是个人理解,非专业解读,如有错误欢迎指出

以上是关于Spark学习记录:Spark SQL编程的主要内容,如果未能解决你的问题,请参考以下文章

实验 5 Spark SQL 编程初级实践

学习笔记Spark—— Spark SQL应用—— Spark SQL简介环境配置

Spark学习 Spark SQL

spark编程指南

Spark基础学习笔记29:Spark SQL内置函数

三万字,Spark学习笔记