Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?
Posted yy3b2007com
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?相关的知识,希望对你有一定的参考价值。
本章节根据源代码分析Spark Structured Streaming(Spark2.4)在进行DataSourceProvider查找的流程,首先,我们看下读取流数据源kafka的代码:
SparkSession sparkSession = SparkSession.builder().getOrCreate();
Dataset<Row> sourceDataset = sparkSession.readStream().format("kafka").option("xxx", "xxx").load();
sparkSession.readStream()返回的对象是DataSourceReader
DataSourceReader(https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala),其中上边代码中的load()方法,正是DataSourceReader的方法。
format参数kafka在DataSourceReader中作为source属性:
@InterfaceStability.Evolving final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging /** * Specifies the input data source format. * * @since 2.0.0 */ def format(source: String): DataStreamReader = this.source = source this 。。。
DataSourceReader#format(source:String)中参数往往是csv/text/json/jdbc/kafka/console/socket等
DataSourceReader#load()方法
/** * Loads input data stream in as a `DataFrame`, for data streams that don‘t require a path * (e.g. external key-value stores). * * @since 2.0.0 */ def load(): DataFrame = if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) throw new AnalysisException("Hive data source can only be used with tables, you can not " + "read files of Hive data source directly.") val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance() // We need to generate the V1 data source so we can pass it to the V2 relation as a shim. // We can‘t be sure at this point whether we‘ll actually want to use V2, since we don‘t know the // writer or whether the query is continuous. val v1DataSource = DataSource( sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) val v1Relation = ds match case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource)) case _ => None ds match case s: MicroBatchReadSupport => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( ds = s, conf = sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions val dataSourceOptions = new DataSourceOptions(options.asJava) var tempReader: MicroBatchReader = null val schema = try tempReader = s.createMicroBatchReader( Optional.ofNullable(userSpecifiedSchema.orNull), Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, dataSourceOptions) tempReader.readSchema() finally // Stop tempReader to avoid side-effect thing if (tempReader != null) tempReader.stop() tempReader = null Dataset.ofRows( sparkSession, StreamingRelationV2( s, source, options, schema.toAttributes, v1Relation)(sparkSession)) case s: ContinuousReadSupport => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( ds = s, conf = sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions val dataSourceOptions = new DataSourceOptions(options.asJava) val tempReader = s.createContinuousReader( Optional.ofNullable(userSpecifiedSchema.orNull), Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, dataSourceOptions) Dataset.ofRows( sparkSession, StreamingRelationV2( s, source, options, tempReader.readSchema().toAttributes, v1Relation)(sparkSession)) case _ => // Code path for data source v1. Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
val ds=DataSoruce.lookupDataSource(source ,….).newInstance()用到了该source变量,要想知道ds是什么(Dataset还是其他),需要查看DataSource.lookupDataSource(source,。。。)方法实现。
DataSource.lookupDataSource(source, sparkSession.sqlContext.conf)解析
DataSource源代码文件:https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
其中lookupDataSource方法是DataSource类的object对象中定义的:
object DataSource extends Logging 。。。。。/** * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0. */ private val spark2RemovedClasses = Set( "org.apache.spark.sql.DataFrame", "org.apache.spark.sql.sources.HadoopFsRelationProvider", "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ def lookupDataSource(provider: String, conf: SQLConf): Class[_] = val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => classOf[OrcFileFormat].getCanonicalName case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled => "org.apache.spark.sql.avro.AvroFileFormat" case name => name val provider2 = s"$provider1.DefaultSource" val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) try serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match // the provider format did not match any given registered aliases case Nil => try Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match case Success(dataSource) => // Found the data source using fully qualified path dataSource case Failure(error) => if (provider1.startsWith("org.apache.spark.sql.hive.orc")) throw new AnalysisException( "Hive built-in ORC data source must be used with Hive support enabled. " + "Please use the native ORC data source by setting ‘spark.sql.orc.impl‘ to " + "‘native‘") else if (provider1.toLowerCase(Locale.ROOT) == "avro" || provider1 == "com.databricks.spark.avro" || provider1 == "org.apache.spark.sql.avro") throw new AnalysisException( s"Failed to find data source: $provider1. Avro is built-in but external data " + "source module since Spark 2.4. Please deploy the application as per " + "the deployment section of \"Apache Avro Data Source Guide\".") else if (provider1.toLowerCase(Locale.ROOT) == "kafka") throw new AnalysisException( s"Failed to find data source: $provider1. Please deploy the application as " + "per the deployment section of " + "\"Structured Streaming + Kafka Integration Guide\".") else throw new ClassNotFoundException( s"Failed to find data source: $provider1. Please find packages at " + "http://spark.apache.org/third-party-projects.html", error) catch case e: NoClassDefFoundError => // This one won‘t be caught by Scala NonFatal // NoClassDefFoundError‘s class name uses "/" rather than "." for packages val className = e.getMessage.replaceAll("/", ".") if (spark2RemovedClasses.contains(className)) throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " + "Please check if your library is compatible with Spark 2.0", e) else throw e case head :: Nil => // there is exactly one registered alias head.getClass case sources => // There are multiple registered aliases for the input. If there is single datasource // that has "org.apache.spark" package in the prefix, we use it considering it is an // internal datasource within Spark. val sourceNames = sources.map(_.getClass.getName) val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark")) if (internalSources.size == 1) logWarning(s"Multiple sources found for $provider1 ($sourceNames.mkString(", ")), " + s"defaulting to the internal datasource ($internalSources.head.getClass.getName).") internalSources.head.getClass else throw new AnalysisException(s"Multiple sources found for $provider1 " + s"($sourceNames.mkString(", ")), please specify the fully qualified class name.") catch case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] => // NoClassDefFoundError‘s class name uses "/" rather than "." for packages val className = e.getCause.getMessage.replaceAll("/", ".") if (spark2RemovedClasses.contains(className)) throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " + "Please remove the incompatible library from classpath or upgrade it. " + s"Error: $e.getMessage", e) else throw e 、、、
其业务流程:
1)优先从object DataSource预定义backwardCompatibilityMap中查找provider;
2)查找失败,返回原名字;
3)使用serviceLoader加载DataSourceRegister的子类集合;
4)过滤3)中集合中shortName与provider相等的provider;
5)返回providerClass。
其中的backwardCompatibilityMap也是DataSource的object对象中的定义的,相当于是一个预定义provider的集合。
object DataSource extends Logging /** A map to maintain backward compatibility in case we move data sources around. */ private val backwardCompatibilityMap: Map[String, String] = val jdbc = classOf[JdbcRelationProvider].getCanonicalName val json = classOf[JsonFileFormat].getCanonicalName val parquet = classOf[ParquetFileFormat].getCanonicalName val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" val nativeOrc = classOf[OrcFileFormat].getCanonicalName val socket = classOf[TextSocketSourceProvider].getCanonicalName val rate = classOf[RateStreamProvider].getCanonicalName Map( "org.apache.spark.sql.jdbc" -> jdbc, "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc, "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc, "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc, "org.apache.spark.sql.json" -> json, "org.apache.spark.sql.json.DefaultSource" -> json, "org.apache.spark.sql.execution.datasources.json" -> json, "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json, "org.apache.spark.sql.parquet" -> parquet, "org.apache.spark.sql.parquet.DefaultSource" -> parquet, "org.apache.spark.sql.execution.datasources.parquet" -> parquet, "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, "org.apache.spark.sql.hive.orc.DefaultSource" -> orc, "org.apache.spark.sql.hive.orc" -> orc, "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc, "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, "com.databricks.spark.csv" -> csv, "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket, "org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate ) 。。。
shortName为kafka且实现了DataSourceRegister接口的类:
满足“shortName为kafka且实现了DataSourceRegister接口的类”就是:KafkaSourceProvider(https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala)
/** * The provider class for all Kafka readers and writers. It is designed such that it throws * IllegalArgumentException when the Kafka Dataset is created, so that it can catch * missing options even before the query is started. */ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSourceProvider with StreamSinkProvider with RelationProvider with CreatableRelationProvider with TableProvider with Logging import KafkaSourceProvider._ override def shortName(): String = "kafka" 。。。。
DataSourceRegister类定义
/** * Data sources should implement this trait so that they can register an alias to their data source. * This allows users to give the data source alias as the format type over the fully qualified * class name. * * A new instance of this class will be instantiated each time a DDL call is made. * * @since 1.5.0 */ @InterfaceStability.Stable trait DataSourceRegister /** * The string that represents the format that this data source provider uses. This is * overridden by children to provide a nice alias for the data source. For example: * * * override def shortName(): String = "parquet" * * * @since 1.5.0 */ def shortName(): String
继承了DataSourceRegister的类有哪些?
继承了DataSourceRegister的类包含:
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
https://github.com/apache/spark/blob/branch-2.4/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
以上是关于Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?的主要内容,如果未能解决你的问题,请参考以下文章