Spark源代码::Spark多线程::NettyRpcEnv.ask解读
Posted Gendan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark源代码::Spark多线程::NettyRpcEnv.ask解读相关的知识,希望对你有一定的参考价值。
private[netty] def askT: ClassTag: Future[T] = {
// 定义了一个Any的promise
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
e match {
case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
case _ => logWarning(s"Ignored failure: $e")
}
}
}
/*
这里声明的onSuccess会被填充到RpcResponseCallback的onSuccess中,这个
RpcResponseCallback就是上面【图9】中的listener,当我们从Server端获取到response后
注意,获取的不是RpcFailure类型的response,则都会进入到【图9】的
else if (message instanceof RpcResponse) { 分支中
*/
def onSuccess(reply: Any): Unit = reply match {
case RpcFailure(e) => onFailure(e)
case rpcReply =>
/*
当返回的response是OK的没有问题后,onSuccess被callback,这里promise的trySuccess也
进行call操作,[PayPal下载](https://www.gendan5.com/wallet/PayPal.html)这里就是上面所说的,为了一个promise铺设了一条future,从而可以执行
这个Future的线程了
*/
if (!promise.trySuccess(rpcReply)) {
logWarning(s"Ignored message: $reply")
}
}
try {
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {
val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
(client, response) => **onSuccess**(deserialize[Any](client, response)))
postToOutbox(message.receiver, rpcMessage)
/*
如果是callback了Failure,则这里会被执行
*/
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
}
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
override def run(): Unit = {
onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
s"in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
/*
当promise的future执行后,会调用这里的onComplete方法
*/
promise.future.onComplete { v =>
timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
} catch {
case NonFatal(e) =>
onFailure(e)
}
/*
利用RpcTimeout中的addMessageIfTimeout的偏函数再去模式匹配一下产生的Throwable内容
如果是RpcTimeoutException 则 直接throw这个ex
如果是TimeoutException 则包装成RpcTimeoutException后再throw出去
*/
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}
以上是关于Spark源代码::Spark多线程::NettyRpcEnv.ask解读的主要内容,如果未能解决你的问题,请参考以下文章
KafkaConsumer 在 spark 中创建和删除 globalTempView 时对多线程访问不安全