SparkSQL使用IDEA创建DataFrame

Posted youngxuebo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL使用IDEA创建DataFrame相关的知识,希望对你有一定的参考价值。

一、使用IDEA开发Spark SQL

1、创建DataFrame

1.1、通过使用 StrucType指定Schema创建DataFrame
1.2、指定列名使用 case class 添加Schema 方式创建DataFrame
1.3、通过SparkSession,read.load方法创建DataFrame,并写入到mysql

1.1 通过使用 StrucType指定Schema创建DataFrame。

	//创建SparkSession对象
    val spark = SparkSession.builder().master("local").appName("MySparksqlDemo").getOrCreate()

    //使用SparkSession自带的sparkContext创建sc(SparkContext)读取文件,并每一行按照\\t分割
    val lines = spark.sparkContext
      .textFile("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\student.txt")
      .map(_.split("\\t"))

    //创建spark-sql表结构
    val schema = StructType(
      List(
        StructField("id",DataTypes.IntegerType),
        StructField("name",DataTypes.StringType),
        StructField("age",DataTypes.IntegerType)
      )
    )

    //将rdd转换成rowRDD,并返回元组
    val rowRDD = lines.map(x => Row(x(0).toInt,x(1).trim(),x(2).toInt))

    //创建DataFrame,将Schema与row对应,将数据与元数据进行拼接 返回一个DataFrame
     val studentDataFreame = spark.createDataFrame(rowRDD,schema)

    //创建视图
    studentDataFreame.createOrReplaceTempView("student")

    //执行sql
     val df = spark.sql("select * from student")

    //来吧展示
    df.show()

    //停止Spark
    spark.stop()

1.2 指定列名使用 case class 添加Schema 方式创建DataFrame。

 //1.创建创建SparkSession
    val spark = SparkSession
      .builder().master("local").appName("MySparksqlDemo2").getOrCreate()

    //2.使用 spark 获取 sparkContext上下文对象,并使用SparkContext读取文件并按照\\t切分
    val lines = spark.sparkContext
      .textFile("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\student.txt")
      .map(_.split("\\t"))

    //3.指定元数据信息,将 RDD和表结构schema关联
    val studentRDD = lines.map(x => Student_schema(x(0).toInt,x(1).trim,x(2).toInt))

    // 4.导入隐式类
    import spark.implicits._
    //5.将RDD 转换为DataFrame
    val studentDF = studentRDD.toDF

    //6.创建试图
    studentDF.createOrReplaceTempView("students")

    //7.数据展示
    spark.sql("select * from students").show()

    //8.关闭spark
    spark.stop()

1.3 通过SparkSession,read.load方法创建DataFrame,并写入到mysql中。

//1.创建创建SparkSession
    val spark = SparkSession.builder().
      master("local").appName("MySparksqlDemo3toMysql").getOrCreate()

    //2.通过SparkSession 使用json格式直接创建DataFrame
    val peopleDf = spark.read.format("json").load("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\people.json")

    //2.通过SparkSession 默认parquet格式直接创建DataFrame
    //val usersDf = spark.read.load("G:\\\\School\\\\Bigdata\\\\Practice\\\\Scala\\\\users.parquet")

    //3.创建临时视图people表
    peopleDf.createOrReplaceTempView("people")

    //创建临时视图users表
    //usersDf.createOrReplaceTempView("users")

    //val sql = "select people.age,people.name,users.favorite_color,users.favorite_numbers from people left join users on people.name = users.name where users.name = \\'Ben\\'"
    //SQL
    val sql = "select * from people where name = \\'Ben\\'"
	//spark.sql执行sql
    val result = spark.sql(sql)

	//创建Properties对象,并设置mysql相关信息
    val props = new Properties()

    val url = "jdbc:mysql://ip:3306/database_name?useSSL=false&serverTimezone=UTC&useUnicode=true"
    props.setProperty("user","***")
    props.setProperty("password","***")

	//通过追加方式,覆盖用overwrite
    result.write.mode("append").jdbc(url,"t_table",props)

    spark.stop()

在1.3中 将spark-sql 执行结果插入mysql中,报错 Can’t get JDBC type for array。
原因为使用默认parquet创建DataFrame,有一个column字段是数组。

import org.apache.spark.sql.functions._

dataframe.withColumn("favorite_numbers", concat_ws(",", $"favorite_numbers"))
// then write to database
    .write.mode(SaveMode.Append).jdbc("url", "song_classify", prop)

注意:
在运行过程中,报错:org.apache.spark.internal.Logging.initinit(Lorg/apache/spark/internal/Logging;)V,如:
解决方式

以上是关于SparkSQL使用IDEA创建DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

sparkSQL创建DataFrame

SparkSQL 核心编程

SparkSQL详解

SparkSQL

Spark-SQL——DataFrame与Dataset

Spark SQL初始化和创建DataFrame的几种方式