spark sqlsparksql支持查询哪些数据源,查询hive与查询mysql的区别

Posted Interest1_wyt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sqlsparksql支持查询哪些数据源,查询hive与查询mysql的区别相关的知识,希望对你有一定的参考价值。

1、数据源介绍

        sparksql默认查询的数据源是hive数据库,除此之外,它还支持其它类型的数据源查询,具体的到源码中看一下:

        可以看到sparksql支持查询的数据源有CSV、parquet、json、orc、txt、jdbc。这些数据源中前面五个我还能理解,最后jdbc数据源我就有了一些疑问,因为很多数据库都支持jdbc连接。那么sparksql是支持所有的jdbc数据源连接吗sparksql通过jdbc查询还会经过逻辑计划、物理计划这些处理流程吗。还有就是sparksql默认查询的hive数据源是通过jdbc吗

2、样例代码

        针对问题编辑了如下代码:

  def main(args: Array[String]): Unit = 

    val sparkSession = SparkSession.builder.appName("wyt01bigdata").master("local").enableHiveSupport().getOrCreate

    //查询mysql
    val dataset: Dataset[Row] = sparkSession.read
      .format("jdbc")
      .option("url", "jdbc:mysql://1.15.38.124:3306/bigdata")
      .option("dbtable", "mysql_person")
      .option("user", "root")
      .option("password", "Wyt-123456")
      .load()
    dataset.toDF.createOrReplaceTempView("temp")
    sparkSession.sql("select id,name,age from temp").show()

    //查询hive
    sparkSession.sql("select id,name,age from wyt.hive_person").show()

  

下面就带着问题追踪下sparksql的执行代码。

3、源码追踪

        在样例代码中,首先是创建一个sparkSession,这是sparksql查询的入口。其次创建DataFrameReader对象,并指定要读取的数据源为jdbc,随后的option方法设定了jdbc的连接参数。接下来的load方法便是重中之重,接下来我们深入看一下。

首先看第一块圈起来的代码,这里的source便是样例demo中format方法设置的数据源类型,load方法首先是判断当前的数据源是不是hive,因为hive数据源可以直接查询,所以hive查询调用该方法会抛异常。接下来再看核心方法lookupDataSourceV2。

首先获取datasourcev1支持的数据源列表,可以看到其支持的数据源有avro、csv、json、kafka、orc、parquet、text。这比我们从方法类中看到的还多。 

 其次根据设置的数据源类型,以向后兼容的方式查找数据源对应的定义类。具体的到lookupDataSource方法中查看:

在第一块圈起来的地方backwardCompatibilityMap是一个记录了数据源和对应定义类的集合,然后通过模式匹配进行查找。因为我们的数据源类型是jdbc,所以会直接返回。第二个圈起来的模块是获取当前线程或者spark应用的类加载器,然后将要加载的DataSourceRegister类型和类加载封装成一个服务工具类。第三个圈起来的模块是最难理解的,其关键在于要对迭代遍历有很深的了解。下面我们主要介绍一下:
在第二个圈起来的部分,我们可以知道,当前封装的类加载工具类,包含了一个类加载器和要加载的类型。然后在第三部分用到了filter语句,这明显是一个迭代遍历的过程。迭代遍历一个对象通用的方式就是通过iterator迭代器进行遍历。所以我们到ServiceLoader类中查看一下关键的hasNext和next方法。

在hasNext中,会根据上下文情况分成两个处理逻辑,但是核心都是调用hasNextService,所以我们接着到hasNextService中看一下处理逻辑(因为下面的代码逻辑比较绕,所以没有贴图片)。

private boolean hasNextService() 
	//如果下次迭代的类全路径名称存在,则直接返回
	if (nextName != null) 
		return true;
	
	//如果资源文件枚举为空
	if (configs == null) 
		try 
			//要加载的资源文件全路径名称 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister(该文件中包含的是数据源提供类的全路径名称)
			String fullName = PREFIX + service.getName();
			//获取要加载的资源文件枚举对象
			if (loader == null)
				configs = ClassLoader.getSystemResources(fullName);
			else
				configs = loader.getResources(fullName);
		 catch (IOException x) 
			fail(service, "Error locating configuration files", x);
		
	
	//获取每个资源文件中记录的类全路径名称
	//每个文件中可能会包含多个类全路径名称,这些都放在pending中,当pending中迭代完之后,会再次迭代configs获取下一个资源文件中的所有全路径名称,并赋值给pending
	while ((pending == null) || !pending.hasNext()) 
		if (!configs.hasMoreElements()) 
			return false;
		
		//解析资源文件中的类全路径名称
		pending = parse(service, configs.nextElement());
	
	nextName = pending.next();
	return true;

理解这段代码的关键是要知道资源文件迭代和资源文件中的内容迭代,其中资源文件迭代是configs,它迭代的是所有依赖jar包中的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister文件,,其次资源文件中的内容迭代是pending,每个资源文件可能包含多个URL类信息(类全路径名称)。然后再来看下next方法:

next方法也是根据上下文情况分成两种处理逻辑,但是核心也是调用nextService方法,所以我们来看下nextService方法:

 nextService方法中主要做了两件事,一个是将全路径类加载到jvm中,另一个则是通过加载的类创建实例并放入集合中。
另外需要注意next方法返回的值,这个值就是filter方法中的入参,而且这个值的类型是Class。至此我们可以推断出filter方法的作用是迭代加载所有的数据源提供者,并过滤出"jdbc"数据源提供者。

接下来我们在看过滤之后的匹配处理,总共有三个大的处理逻辑:


一是如果没有任何匹配的数据源提供类,则直接加载数据源类型,如果加载失败则抛异常。
二是如果只匹配了一个数据源,则直接返回对应的class类。
三是如果找到了多个指定的数据源提供类,则会查找是否有org.apache.spark开头的类,如果没有则抛异常声明找到多个类。

因为只找到了一个jdbc类,所以会走第二个逻辑直接返回,我们再回到上一层:

 因为没有匹配jdbc的情况,所以最终返回none。接着我们再回到上一层查看逻辑。

可以看到当lookupDataSourceV2方法为空时,最后的getOrElse会被执行,所以接着到loadV1Source中方法中查看。

 可以看到loadV1Source方法中首先创建一个数据源实例,然后调用该实例的resolveRelation方法(注意该方法最终返回的是一个DataFrame对象)。那接下来我们再到resolveRelation方法中看一下。

在该方法中首先会根据我们的全路径类名创建一个数据源提供类,然后进行模式匹配,因为我们匹配上的是jdbc数据源提供类,所以在createRelation方法中会调用JdbcRelationProvider的方法。我们再接着看下:

  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = 
    //获取代码中输入的jdbc连接参数
    val jdbcOptions = new JDBCOptions(parameters)
    val resolver = sqlContext.conf.resolver
    val timeZoneId = sqlContext.conf.sessionLocalTimeZone
    //获取表的字段信息
    val schema = JDBCRelation.getSchema(resolver, jdbcOptions)
    //根据配置获取表的分区信息(如果配置了多分区,那么会针对不同的分区生成不同where语句,进而加快查询效率)
    val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
    JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession)
  

 该方法中比较关键的是获取表的字段信息和获取分区信息,由于分区比较复杂,为避免文章内容跟标题不符合,所以不过多讲解,感兴趣的可以自行查看。我们看下如何获取表的字段信息:

可以看到最终其实是通过sql语句查询不存在的表数据,进而获得的表字段信息。然后让我们再顺着流程走,最后通过jdbc连接获得了一个DataFrame。该DataFrame记录了表字段信息和jdbc数据源连接信息。

回到写的样例demo中,我们接着向下看:

接着将数据源注册成一个临时视图,并进行查询(这块的逻辑计划过程再前面的文章中有讲,物理计划的执行过程因为没总结完善所以暂时没发布)。 查询时逻辑计划的处理过程都是一样的,但是在物理计划执行的时候不同的数据源有不同的逻辑
至此我们jdbc的查询讲解结束了,接下来我们看下hive的查询过程,看看hive是不是也通过jdbc查询的数据。

在前面的文章中我们讲解了sql库表解析主要依赖于Analyzer的ResolveRelations规则,我们直接到该解析规则中进行查看。

首先源码中会解析视图,由于没有注册hive_person视图是,所以会跳到第二个红框中执行。

 接着执行getTable时,externalCatalog对象是一个trait,可以从debug断点中查看具体的对象名称,然后接着向下看:

 

 

 至此可以看到,sparksql查询hive时,其元数据是通过client客户端进行交互获取的,至于client客户端的最底层是通过什么原理获取表数据,这里不再追究。感兴趣的可以自行查阅。另外上述比较关键的delegate为什么是HiveExeternalCatalog,这个主要是因为demo中使用了enableHiveSupport,这个展开讲篇幅又会很多,所以这里只留个提示。有兴趣的同样可以自行查阅。

上面说了hive元数据的获取是通过client的方式,那么具体数据的读取是通过什么方式呢,这个可以到具体的物理算子HiveTableScanExec中查看,可以看到其是通过hadoopReader进行的数据读取。

 至此,hive的查询过程也大致讲解完毕,因为时间和个人能力问题。有些地方讲解的不够细致,希望没有给兄弟们带来误导。

4、总结

1)sparksql支持的数据源有avro、csv、json、kafka、orc、parquet、text、jdbc、hive等,也支持自定义数据源(下一章会讲解自定义数据源用法)
2)sparksql支持所有的jdbc连接,所有的jdbc sql会统一经历 sql->语法树->逻辑计划->物理计划->sql的过程。

3)sparksql查询hive是通过client和hadoopReader。
4)查询hive和查询mysql主要的区别就是一个通过jdbc一个不通过jdbc。
5)META-INF/services/org.apache.spark.sql.sources.DataSourceRegister中包含的数据源提供者的全路径名称

5、引申

5.1 DataSourceV2和DataSourceV1区别

DataSource API v1 版本于 Spark 1.3 发布。根据社区反馈,它具有下面的限制:
1)由于其输入参数包括 DataFrame / SQLContext,因此 DataSource API 兼容性取决于这些上层的 API。
2)物理存储信息(例如,划分和排序)不会从数据源传播,并且因此,Spark 的优化器无法利用。
3)可扩展性不好,并且算子的下推能力受限。
4)缺少高性能的列式读取接口。
5)写入接口是如此普遍,不支持事务。
由于上面的限制和问题, Spark SQL 内置的数据源实现(如 Parquet,JSON等)不使用这个公共 DataSource API。相反,他们使用内部/非公共的接口。这样很难使得外部的数据源实现像内置的一样快。在这样的背景下,DataSource API v2 版本应运而生。DataSource API v2版本旨在提供一个高性能的,易于维护的,易于扩展的外部数据源接口。

5.2 如果hive中的表和注册的视图同名,那么最终会查询哪个数据。

会查视图,因为源码中查询的顺序是先查视图再查数据源

02 Spark架构与运行流程

1、Spark已打造出结构一体化、功能多样化的大数据生态系统,请简述Spark生态系统

Spark的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成一套完整生态系统,既能够提供内存计算框架,也可以支持SQL即席查询、实时流式计算、机器学习和图计算等。Spark可以部署在资源管理器YARN之上,提供一站式的大数据解决方案。因此,Spark所提供的生态系统同时支持批处理、交互式查询和流数据处理。

 

2、用图文描述你所理解的Spark运行架构,运行流程

基本概念:

Application:用户编写的Spark应用程序。
Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。
Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task。
RDD:弹性分布式数据集,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
DAG:有向无环图,反映RDD之间的依赖关系。
Task:运行在Executor上的工作单元。
Job:一个Job包含多个RDD及作用于相应RDD上的各种操作。
Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表一组关联的,相互之间没有Shuffle依赖关系的任务组成的任务集。
Cluter Manager:指的是在集群上获取资源的外部服务。目前有三种类型
1) Standalon : spark原生的资源管理,由Master负责资源的分配
2) Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
3) Hadoop Yarn: 主要是指Yarn中的ResourceManager


Spark运行基本流程:

 

1、为应用构建起基本的运行环境,即由Driver创建一个SparkContext进行资源的申请、任务的分配和监控
2、资源管理器为Executor分配资源,并启动Executor进程
3、SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理。
Executor向SparkContext申请Task,TaskScheduler将Task发放给Executor运行并提供应用程序代码。
4、Task在Executor上运行把执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕后写入数据并释放所有资源。


Spark运行架构特点:

1、每个Application都有自己专属的Executor进程,并且该进程在Application运行期间一直驻留。Executor进程以多线程的方式运行Task。
2、Spark运行过程与资源管理器无关,只要能够获取Executor进程并保存通信即可。
3、Task采用数据本地性和推测执行等优化机制。

以上是关于spark sqlsparksql支持查询哪些数据源,查询hive与查询mysql的区别的主要内容,如果未能解决你的问题,请参考以下文章

假如mysql数据库单表有100W行记录,有哪些方式加快查询速度

Spark使用OSS Select加速数据查询

spark与hadoop相比,存在哪些缺陷

SparkAPI中的spark.sql(sql)支持limit查询吗?例如select * from tablename limit 1,10。

Spark 支持子查询吗? [复制]

哪些 SQL 数据库支持 CHECK 约束中的子查询?