Spark读取和使用Hive Permanent Function 原理
Posted itboys
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark读取和使用Hive Permanent Function 原理相关的知识,希望对你有一定的参考价值。
背景
在Spark 1.* 版本中不支持Hive Perment Function的加载,使用起来不是很方便;
在Spark 2.* 版本中通过HiveExternalCatalog 中的HiveClient 来直接和hive metaStore交互,轻松实现Hive Perment Function的加载;
Spark 2.* UDF查找过程
既然Spark 2.* 可以实现Hive UDF的管理,那就先走读一下~~
sql解析 UDF 函数
// 开始调用batch中的rule进行analyze
execute:76, RuleExecutor (org.apache.spark.sql.catalyst.rules)
// 通过ResolveFunctions 这个rule来解析SQL中function
ResolveFunctions : catalog.lookupFunction(funcId, children)
HiveSessionCatalog : super.lookupFunction(funcName, children)
SessionCatalog : externalCatalog.getFunction(database, name.funcName)
ExternalCatalogWithListener : delegate.getFunction(db, funcName)
HiveExternalCatalog : client.getFunction(db, funcName)
HiveClient : getFunctionOption(db, name)
Spark UDF 函数resources文件管理
虽然在语法解析阶段成功解析了UDF信息,但是程序在运行过程中还需要将Jar包下载到本地,并用classloader进行加载;因为UDF函数的识别只是在语法解析阶段做一次,接着会TaskSetManager中会将Tasks信息全部序列化分发到各Executor节点,executor反序列化task信息后,首先去下载task需要的Files和jar包,并加载,然后再调度执行task。
// 1. 在从HiveMetaStore中读取到function信息后,加载函数 resources
class SessionCatalog() {
def lookupFunction() {
// ...
val catalogFunction = try {
externalCatalog.getFunction(database, name.funcName)
} catch {
case _: AnalysisException => failFunctionLookup(name)
case _: NoSuchPermanentFunctionException => failFunctionLookup(name)
}
loadFunctionResources(catalogFunction.resources)
// ...
}
/**
* Loads resources such as JARs and Files for a function. Every resource is represented
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
resources.foreach(functionResourceLoader.loadResource)
}
}
// 2. HiveSessionResourceLoader
// 实现 catalogFunction 中函数需要的资源加载;
// a. 如果是 HiveSessionResourceLoader 需要当前Hive Client执行add resource命令
// b. Jar 和File 都需要添加的 session.sparkContext 中,用于后续处理;
// c. 当前 classloader 需要加载远程的jar包资源
class HiveSessionResourceLoader(
session: SparkSession,
clientBuilder: () => HiveClient)
extends SessionResourceLoader(session) {
private lazy val client = clientBuilder()
override def addJar(path: String): Unit = {
client.addJar(path)
super.addJar(path)
}
}
/**
* Session shared [[FunctionResourceLoader]].
*/
@Unstable
class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
override def loadResource(resource: FunctionResource): Unit = {
resource.resourceType match {
case JarResource => addJar(resource.uri)
case FileResource => session.sparkContext.addFile(resource.uri)
case ArchiveResource =>
throw new AnalysisException(
"Archive is not allowed to be loaded. If YARN mode is used, " +
"please use --archives options while calling spark-submit.")
}
}
/**
* Add a jar path to [[SparkContext]] and the classloader.
*
* Note: this method seems not access any session state, but a Hive based `SessionState` needs
* to add the jar to its hive client for the current session. Hence, it still needs to be in
* [[SessionState]].
*/
def addJar(path: String): Unit = {
session.sparkContext.addJar(path)
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
} else {
// `path` is a URL with a scheme
uri.toURL
}
session.sharedState.jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
}
}
// 3. 通过 SparkContext 函数 addJar() 方法,增加整个任务运行时需要的Jar包资源
class SparkContext(config: SparkConf) extends Logging {
// Used to store a URL for each static file/jar together with the file‘s local timestamp
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
/**
* Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
*
* If a jar is added during execution, it will not be available until the next TaskSet starts.
*
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addJar(path: String) {
def addJarFile(file: File): String = {
try {
if (!file.exists()) {
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
}
if (file.isDirectory) {
throw new IllegalArgumentException(
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
}
env.rpcEnv.fileServer.addJar(file)
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
null
}
}
if (path == null) {
logWarning("null specified as parameter to addJar")
} else {
val key = if (path.contains("\")) {
// For local paths with backslashes on Windows, URI throws an exception
addJarFile(new File(path))
} else {
val uri = new URI(path)
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
uri.getScheme match {
// A JAR file which exists only on the driver node
case null =>
// SPARK-22585 path without schema is not url encoded
addJarFile(new File(uri.getRawPath))
// A JAR file which exists only on the driver node
case "file" => addJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
case "local" => "file:" + uri.getPath
case _ => path
}
}
if (key != null) {
val timestamp = System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
} else {
logWarning(s"The jar $path has been added already. Overwriting of added jars " +
"is not supported in the current version.")
}
}
}
}
}
private[spark] class TaskSetManager() {
// SPARK-21563 make a copy of the jars/files so they are consistent across the TaskSet
private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*)
private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
// ...
if (!isZombie && !offerBlacklisted) {
//...
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// ...
// 4. 准备好TaskDescription 任务信息,等待被调度执行,关注 addedFiles 和 addedJars
// a. 相同 TaskSetManager 有相同的资源文件
// b. 所有信息就是 SparkConext 中的数据
new TaskDescription(
taskId,
attemptNum,
execId,
taskName,
index,
task.partitionId,
addedFiles,
addedJars,
task.localProperties,
serializedTask)
}
}
}
}
// Executor
// Executor 运行 Task 信息
class TaskRunner(
execBackend: ExecutorBackend,
private val taskDescription: TaskDescription)
extends Runnable {
// ...
override def run(): Unit = {
try {
// Must be set before updateDependencies() is called, in case fetching dependencies
// requires access to properties contained within (e.g. for access control).
Executor.taskDeserializationProps.set(taskDescription.properties)
// 5. task运行之前,先更新任务依赖的 Files 和 Jars
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
// ...
}
}
}
/**
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// ...
for ((name, timestamp) <- newJars) {
val localName = new URI(name).getPath.split("/").last
val currentTimeStamp = currentJars.get(name)
.orElse(currentJars.get(localName))
.getOrElse(-1L)
if (currentTimeStamp < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
// Fetch file with useCache mode, close cache for local mode.
// 6. 下载jar包文件到本地,并使用 ClassLoader 加载
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentJars(name) = timestamp
// Add it to our class loader
val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL
if (!urlClassLoader.getURLs().contains(url)) {
logInfo("Adding " + url + " to class loader")
urlClassLoader.addURL(url)
}
}
}
}
}
Spark 1.6 UDF 加载过程
object ResolveFunctions extends Rule[LogicalPlan] {
...
// 这里registry 是通过 外部类 Analyzer的构造方法传入的,在 spark 1.6 实例化如下:
// SQLContext : val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
// HiveContext : val functionRegistry: FunctionRegistry = new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), this.executionHive)
registry.lookupFunction(name, children)
...
}
// 默认使用HiveContext
// hiveUDFs.scala
override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
// underlying 是 SimpleFunctionRegistry,内部注册了spark 内置的 xx 个udf函数
Try(underlying.lookupFunction(name, children)).getOrElse {
// We only look it up to see if it exists, but do not include it in the HiveUDF since it is
// not always serializable.
val functionInfo: FunctionInfo =
Option(getFunctionInfo(name.toLowerCase)).getOrElse(
throw new AnalysisException(s"undefined function $name"))
}
}
def getFunctionInfo(name: String): FunctionInfo = {
// Hive Registry need current database to lookup function
// TODO: the current database of executionHive should be consistent with metadataHive
executionHive.withHiveState {
FunctionRegistry.getFunctionInfo(name)
}
}
public final class FunctionRegistry {
// 1. 调用 Hive 的 Registry 接口,把Hive内置的udf函数和class都注册到 system Registry中,写死的...而且 FunctionRegistry类还是 final class
// registry for system functions
private static final Registry system = new Registry(true);
static {
system.registerGenericUDF("concat", GenericUDFConcat.class);
system.registerUDF("substr", UDFSubstr.class, false);
// ....
}
// 2. 查找Function时,先获取当前session中注册的temporary function,再取system中注册的function,和external catalog完全无关
public static FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
FunctionInfo info = getTemporaryFunctionInfo(functionName);
return info != null ? info : system.getFunctionInfo(functionName);
}
public static FunctionInfo getTemporaryFunctionInfo(String functionName) throws SemanticException {
Registry registry = SessionState.getRegistry();
return registry == null ? null : registry.getFunctionInfo(functionName);
}
}
以上是关于Spark读取和使用Hive Permanent Function 原理的主要内容,如果未能解决你的问题,请参考以下文章