spark2.x如何通过SparkSQL读取csv文件
Posted Maynor的大数据奋斗之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark2.x如何通过SparkSQL读取csv文件相关的知识,希望对你有一定的参考价值。
package cn.itcast.spark.source
import java.util.Properties
import org.apache.spark.sql.types.DoubleType, IntegerType, LongType, StructType
import org.apache.spark.sql.DataFrame, SparkSession
object _03SparkSQLSourceTest
def main(args: Array[String]): Unit =
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
import spark.implicits._
// TODO: 1. CSV 格式数据文本文件数据 -> 依据 CSV文件首行是否是列名称,决定读取数据方式不一样的
/*
CSV 格式数据:
每行数据各个字段使用逗号隔开
也可以指的是,每行数据各个字段使用 单一 分割符 隔开数据
*/
// 方式一:首行是列名称,数据文件u.dat
val dataframe: DataFrame = spark.read
.format("csv")
.option("sep", "\\\\t")
.option("header", "true")
.option("inferSchema", "true")
.load("datas/ml-100k/u.dat")
dataframe.printSchema()
dataframe.show(10, truncate = false)
// 方式二:首行不是列名,需要自定义Schema信息,数据文件u.data
// 自定义schema信息
val schema: StructType = new StructType()
.add("user_id", IntegerType, nullable = true)
.add("iter_id", IntegerType, nullable = true)
.add("rating", DoubleType, nullable = true)
.add("timestamp", LongType, nullable = true)
val df: DataFrame = spark.read
.format("csv")
.schema(schema)
.option("sep", "\\\\t")
.load("datas/ml-100k/u.data")
df.printSchema()
df.show(10, truncate = false)
/* ============================================================================== */
// TODO: 2. 读取mysql表中数据
// 第一、简洁版格式
/*
def jdbc(url: String, table: String, properties: Properties): DataFrame
*/
val props = new Properties()
props.put("user", "root")
props.put("password", "123456")
props.put("driver", "com.mysql.cj.jdbc.Driver")
val empDF: DataFrame = spark.read.jdbc(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
"db_test.emp", //
props //
)
println(s"Partition Number = $empDF.rdd.getNumPartitions")
empDF.printSchema()
empDF.show(10, truncate = false)
// 第二、标准格式写
/*
WITH tmp AS (
select * from emp e join dept d on e.deptno = d.deptno
)
*/
val table: String = "(select ename,deptname,sal from db_test.emp e join db_test.dept d on e.deptno = d.deptno) AS tmp"
val joinDF: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", table)
.load()
joinDF.printSchema()
joinDF.show(10, truncate = false)
// 应用结束,关闭资源
spark.stop()
以上是关于spark2.x如何通过SparkSQL读取csv文件的主要内容,如果未能解决你的问题,请参考以下文章
spark 2.x 正在使用 csv 函数将整数/双列作为字符串读取
Spark(23)——Spark1.X和Spark2.X的区别
Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?