SparkSQL使用IDEA创建DataFrame
Posted youngxuebo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL使用IDEA创建DataFrame相关的知识,希望对你有一定的参考价值。
一、使用IDEA开发Spark SQL
1、创建DataFrame
1.1、通过使用 StrucType指定Schema创建DataFrame
1.2、指定列名使用 case class 添加Schema 方式创建DataFrame
1.3、通过SparkSession,read.load方法创建DataFrame,并写入到mysql中
1.1 通过使用 StrucType指定Schema创建DataFrame。
//创建SparkSession对象
val spark = SparkSession.builder().master("local").appName("MySparksqlDemo").getOrCreate()
//使用SparkSession自带的sparkContext创建sc(SparkContext)读取文件,并每一行按照\\t分割
val lines = spark.sparkContext
.textFile("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\student.txt")
.map(_.split("\\t"))
//创建spark-sql表结构
val schema = StructType(
List(
StructField("id",DataTypes.IntegerType),
StructField("name",DataTypes.StringType),
StructField("age",DataTypes.IntegerType)
)
)
//将rdd转换成rowRDD,并返回元组
val rowRDD = lines.map(x => Row(x(0).toInt,x(1).trim(),x(2).toInt))
//创建DataFrame,将Schema与row对应,将数据与元数据进行拼接 返回一个DataFrame
val studentDataFreame = spark.createDataFrame(rowRDD,schema)
//创建视图
studentDataFreame.createOrReplaceTempView("student")
//执行sql
val df = spark.sql("select * from student")
//来吧展示
df.show()
//停止Spark
spark.stop()
1.2 指定列名使用 case class 添加Schema 方式创建DataFrame。
//1.创建创建SparkSession
val spark = SparkSession
.builder().master("local").appName("MySparksqlDemo2").getOrCreate()
//2.使用 spark 获取 sparkContext上下文对象,并使用SparkContext读取文件并按照\\t切分
val lines = spark.sparkContext
.textFile("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\student.txt")
.map(_.split("\\t"))
//3.指定元数据信息,将 RDD和表结构schema关联
val studentRDD = lines.map(x => Student_schema(x(0).toInt,x(1).trim,x(2).toInt))
// 4.导入隐式类
import spark.implicits._
//5.将RDD 转换为DataFrame
val studentDF = studentRDD.toDF
//6.创建试图
studentDF.createOrReplaceTempView("students")
//7.数据展示
spark.sql("select * from students").show()
//8.关闭spark
spark.stop()
1.3 通过SparkSession,read.load方法创建DataFrame,并写入到mysql中。
//1.创建创建SparkSession
val spark = SparkSession.builder().
master("local").appName("MySparksqlDemo3toMysql").getOrCreate()
//2.通过SparkSession 使用json格式直接创建DataFrame
val peopleDf = spark.read.format("json").load("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\people.json")
//2.通过SparkSession 默认parquet格式直接创建DataFrame
//val usersDf = spark.read.load("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\users.parquet")
//3.创建临时视图people表
peopleDf.createOrReplaceTempView("people")
//创建临时视图users表
//usersDf.createOrReplaceTempView("users")
//val sql = "select people.age,people.name,users.favorite_color,users.favorite_numbers from people left join users on people.name = users.name where users.name = \\'Ben\\'"
//SQL
val sql = "select * from people where name = \\'Ben\\'"
//spark.sql执行sql
val result = spark.sql(sql)
//创建Properties对象,并设置mysql相关信息
val props = new Properties()
val url = "jdbc:mysql://ip:3306/database_name?useSSL=false&serverTimezone=UTC&useUnicode=true"
props.setProperty("user","***")
props.setProperty("password","***")
//通过追加方式,覆盖用overwrite
result.write.mode("append").jdbc(url,"t_table",props)
spark.stop()
在1.3中 将spark-sql 执行结果插入mysql中,报错 Can’t get JDBC type for array。
原因为使用默认parquet创建DataFrame,有一个column字段是数组。
import org.apache.spark.sql.functions._
dataframe.withColumn("favorite_numbers", concat_ws(",", $"favorite_numbers"))
// then write to database
.write.mode(SaveMode.Append).jdbc("url", "song_classify", prop)
注意:
在运行过程中,报错:org.apache.spark.internal.Logging.initinit(Lorg/apache/spark/internal/Logging;)V,如:
解决方式
以上是关于SparkSQL使用IDEA创建DataFrame的主要内容,如果未能解决你的问题,请参考以下文章