spark sql的简单操作

Posted 薛定谔的猫!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sql的简单操作相关的知识,希望对你有一定的参考价值。

测试数据
sparkStu.text
zhangxs 24 chenxy
wangYr 21 teacher
wangx 26 teacher
sparksql
{
"name":"zhangxs","age":24,"job":"chengxy",
"name":"li","age":21,"job":"teacher",
"name":"tao","age":14,"job":"student"
}
 
object CreateDataFream {
//创建student对象
case class Student(name:String,age:BigInt,job:String);

def main(args: Array[String]){
//初始化sparkSession 这个sparkSession要用val关键字修饰
val spark = SparkSession
.builder()
.appName("Spark SQL Example")
.master("spark://服务器ip:7077")
.getOrCreate();
// runDataSetCreate(spark);
// runSarkOnFile(spark);
// applySchema(spark);
//loadParquet(spark);
//jsonFile(spark);
//销毁sparkSession
spark.stop();
}

}
 
//对指定的列进行查询
private def test1(spark :SparkSession){
//因为要使用变量,$符号,所以导入这个包
import spark.implicits._
//从hdfs上读取json数据文件并创建dataFream
var dataFreamS= spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql");
//显示dataFream所有数据
dataFreamS.show();
//打印dataFrame结构
dataFreamS.printSchema();
//显示指定列的数据
dataFreamS.select("name").show()
//查询指定的列,并修改数据
dataFreamS.select($"name", $"age"+1).show();
//查询年龄大于10的人
dataFreamS.select($"age" > 10).show();
//查看每个年龄段的人数
dataFreamS.groupBy("age").count();
//创建临时视图,如果这个视图已经存在就覆盖掉
dataFreamS.createOrReplaceTempView("zhangxsView");
}

 

 
//创建dataFrame并运行 
private def runDataSetCreate(spark:SparkSession){
import spark.implicits._
//创建DataSets对象 类型是Student
val dataStu = Seq(Student("Andy", 32,"baiLing")).toDS();
//显示数据集信息
dataStu.show();
//创建数据的dataSet
var dataArr=Seq(1,2,3).toDS();
//显示数据集的信息
dataArr.show();
//对属性进行简单操作
print(dataArr.map (_ +1 ).collect());
//dataFrame能够被转换成自定义对象类型的dataSet,
val dfStu=spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql").as[Student];
dfStu.show();
//jsonFile支持嵌套表,读入并注册成表
spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql").registerTempTable("student");
//根据sql查询注册的table
val temsql=spark.sqlContext.sql("select name from student");
//显示name的value
print(temsql.show())
}

 

//从hdfs上读取数据文件并转为student对象进行操作
private def runSarkOnFile(spark:SparkSession){
import spark.implicits._
//读取数据文件 并生成rdd
var rdd=spark.read.textFile("hdfs://服务器ip:8020/tmp/dataTest/sparkStu.txt");
//对获取的rdd进行解析,并生成sutdent对象
var sturdd=rdd.map { x => x.split(" ")}.map { z => Student(z(0).toString(),z(1).toInt,z(2).toString())};
//显示student对象
sturdd.show();
//将sutdent对象注册成临时表 student
sturdd.registerTempTable("student");
//查询临时表中的数据,并显示
var sqlDF=spark.sql("select t.name,t.age,t.job from friend t where t.age>14 and t.age<26");
sqlDF.show();
}

 

private def applySchema(spark:SparkSession){
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
//确定schema名称(列的名称)
var schemaString="name,age,job";
//解析schemaString,并生成StructType对象数组
var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)})
//从hdfs上读取数据文件
var stuDS=spark.sparkContext.textFile(path);
//使用Row对象,创建rowRdd
var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2)))
//创建schemaRDD
var rowDF=spark.createDataFrame(sDS, schemaType); // var rowDF=spark.sqlContext.applySchema(sDS, schemaType); 这种方法已经过时
 //打印schemaRDD的结构
rowDF.printSchema();
//注册Student table
rowDF.createOrReplaceTempView("Student"); // rowDF.registerTempTable("Student"); 这种方法已经过时
//rowDF.collect().foreach {print(_) }
//var resDS=spark.sql("select * from Student where age > 24");
var resDS=spark.sql("select name from Student");
resDS.show();
}
 
//使用parquet文件的方式
private def loadParquet(spark:SparkSession){
import spark.implicits._
//确定schema 列名称
var schemaString="name,age,job";
//解析schemaString,并生成StructType对象数组
var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)})
//创建rowRdd
var stuDS=spark.sparkContext.textFile(path);
var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2)))
//将schemaRDD保存成parquet文件
var rowDF=spark.sqlContext.applySchema(sDS, schemaType);
//将文件写到hdfs://服务器ip:8020/tmp/dataTest/
rowDF.write.parquet("hdfs://服务器ip:8020/tmp/dataTest/student.parquet");
-------------------------------------------------------------------
//读取parquet文件
var redParfile=spark.read.parquet("hdfs://服务器ip:8020/tmp/dataTest/student.parquet");
redParfile.createOrReplaceTempView("redParfilered");
var resultRdd=spark.sql("select * from redParfilered t where t.name=‘zhangxs‘");
//DataFrame.rdd 可以将dataFrame转为RDD类型
resultRdd.rdd.map { x => "name"+x(0) }.collect().foreach { print(_) }
}
 
/**
* spark可以自动的识别一个json模式并加载成数据集,
* 这种转换可以使用SparkSession.read.json() 函数
* 这个数据集的来源可以是一个rdd,也可以是一个json文件
*
*/
private def jsonFile(spark:SparkSession){
var jsonRdd=spark.read.json("hdfs://192.168.177.124:8020/tmp/dataTest/sparksql");
jsonRdd.createOrReplaceTempView("student");
var jfRdd= spark.sql("select * from student t where t.age >24");
jfRdd.show();

 

/**
* 使用Json类型的rdd加载json
*
* 如果加:: Nil,返回是一个char类型的rdd,加上则返回的是String类型的rdd
*/
var rdd=spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil);
var rddre=spark.read.json(rdd);
rddre.show();
}

 

以上是关于spark sql的简单操作的主要内容,如果未能解决你的问题,请参考以下文章

控制 spark-sql 和数据帧中的字段可空性

spark sql的简单操作

在这个 spark 代码片段中 ordering.by 是啥意思?

python+spark程序代码片段

Spark:如何加速 foreachRDD?

Spark-SQL之DataFrame操作大全