FlinkFlink 源码之RPC调用
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之RPC调用相关的知识,希望对你有一定的参考价值。
1.概述
2.简介
本篇为大家分析Flink底层RPC调用的执行过程和原理。Flink RPC使用了Akka Actor框架。建议阅读前需要Akka Actor的基础知识。
Flink为了简化Actor的使用,对Actor做了一系列封装。定义了如下几个重要的接口或类:
-
RpcService:定义了RPC服务端(调用和被调用端)的一系列行为,远程调用需要指定地址和端口号。包含服务的启停,和连接其他远程的RpcService。
-
AkkaRpcService:RpcService的实现类。
RpcServer:RPC服务,它自己也是个RpcGateway。 -
AkkaInvocationHandler:RpcServer的实现类。RpcService连接其他远程RpcService返回的实际上是一个代理类。这个代理类的真实执行逻辑位于AkkaInvocationHandler。
-
RpcGateway:RPC网关,调用其他RpcService的入口。一个RpcService连接其他远程RpcService返回的对象是RpcGateway类型。通常RpcGateway还实现了其他接口,这个接口和远端被调用的RpcService实现的接口相同。上面说过,RpcService连接其他远程RpcService返回的实际上是一个代理类,我们调用这个代理类的时候,底层会通过Akka调用远程RpcService的同名方法。
-
RpcEndpoint:RPC被调用端,需要持有一个RpcService,并且实现自己的业务逻辑接口,以供RpcGateway远程调用的时候执行。
Flink封装后的Akka使用起来非常简单。我们可以参考Flink单元测试RemoteAkkaRpcActorTest的canRespondWithSerializedValueRemotely方法,它通过remoteGateway远程调用AkkaRpcActorTest.SerializedValueRespondingEndpoint的getSerializedValueSynchronously方法。代码如下所示:
@Test
public void canRespondWithSerializedValueRemotely() throws Exception
try (final AkkaRpcActorTest.SerializedValueRespondingEndpoint endpoint =
new AkkaRpcActorTest.SerializedValueRespondingEndpoint(rpcService))
endpoint.start();
final AkkaRpcActorTest.SerializedValueRespondingGateway remoteGateway =
otherRpcService
.connect(
endpoint.getAddress(),
AkkaRpcActorTest.SerializedValueRespondingGateway.class)
.join();
assertThat(
remoteGateway.getSerializedValueSynchronously(),
equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
final CompletableFuture<SerializedValue<String>> responseFuture =
remoteGateway.getSerializedValue();
assertThat(
responseFuture.get(),
equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
单元测试中如何创建RpcService的方法没有贴出。
本篇从TaskManagerRunner创建RpcService开始,分析Flink封装Akka Actor的方法和整个调用流程。
3.RpcService
3.1 TaskManager创建RpcService
TaskManager的createRpcService方法根据Flink的配置文件和高可用服务,创建出RpcService。
TaskManagerRunner的createRpcService方法如下:
@VisibleForTesting
static RpcService createRpcService(
final Configuration configuration, final HighAvailabilityServices haServices)
throws Exception
checkNotNull(configuration);
checkNotNull(haServices);
return AkkaRpcServiceUtils.createRemoteRpcService(
configuration,
determineTaskManagerBindAddress(configuration, haServices),
configuration.getString(TaskManagerOptions.RPC_PORT),
configuration.getString(TaskManagerOptions.BIND_HOST),
configuration.getOptional(TaskManagerOptions.RPC_BIND_PORT));
创建RpcService的逻辑在AkkaRpcServiceUtils工具类中。接下来我们谈一下RpcService和它的创建过程。
3.2 RpcService定义
负责使用Akka Actor执行RPC。拥有一个子类AkkaRpcService。一个进程拥有一个RpcService。用于统筹RPC调用服务,连接到本地Endpoint启动服务,供远端调用(RpcEndpoint是被调用端),或者连接到远程RPC服务,创建出一个RpcGateway(调用端),从而可以调用远端。
RpcService接口代码如下所示:
public interface RpcService
/**
* Return the hostname or host address under which the rpc service can be reached. If the rpc
* service cannot be contacted remotely, then it will return an empty string.
*
* @return Address of the rpc service or empty string if local rpc service
*/
// 获取RPC服务的地址
// 如果是本地RPC服务的话,返回空
String getAddress();
/**
* Return the port under which the rpc service is reachable. If the rpc service cannot be
* contacted remotely, then it will return -1.
*
* @return Port of the rpc service or -1 if local rpc service
*/
// 返回RPC服务端口号
// 如果是本地RPC服务,返回-1
int getPort();
/**
* Connect to a remote rpc server under the provided address. Returns a rpc gateway which can be
* used to communicate with the rpc server. If the connection failed, then the returned future
* is failed with a @link RpcConnectionException.
*
* @param address Address of the remote rpc server
* @param clazz Class of the rpc gateway to return
* @param <C> Type of the rpc gateway to return
* @return Future containing the rpc gateway or an @link RpcConnectionException if the
* connection attempt failed
*/
// 根据提供的地址,连接到远程RPC服务
// 返回C类型RPC网关,用于和远端通信
<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
/**
* Connect to a remote fenced rpc server under the provided address. Returns a fenced rpc
* gateway which can be used to communicate with the rpc server. If the connection failed, then
* the returned future is failed with a @link RpcConnectionException.
*
* @param address Address of the remote rpc server
* @param fencingToken Fencing token to be used when communicating with the server
* @param clazz Class of the rpc gateway to return
* @param <F> Type of the fencing token
* @param <C> Type of the rpc gateway to return
* @return Future containing the fenced rpc gateway or an @link RpcConnectionException if the
* connection attempt failed
*/
// 创建一个具有Fence功能的RPC网关
// Fence是防止脑裂的机制,我们在AkkaRpcActor分析Fencing机制原理
<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
String address, F fencingToken, Class<C> clazz);
/**
* Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
*
* @param rpcEndpoint Rpc protocol to dispatch the rpcs to
* @param <C> Type of the rpc endpoint
* @return Self gateway to dispatch remote procedure calls to oneself
*/
// 启动RPC服务,将接收到的远程请求发送给rpcEndpoint处理
<C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);
/**
* Fence the given RpcServer with the given fencing token.
*
* <p>Fencing the RpcServer means that we fix the fencing token to the provided value. All RPCs
* will then be enriched with this fencing token. This expects that the receiving RPC endpoint
* extends @link FencedRpcEndpoint.
*
* @param rpcServer to fence with the given fencing token
* @param fencingToken to fence the RpcServer with
* @param <F> type of the fencing token
* @return Fenced RpcServer
*/
// 和上面的方法一样,只不过启用了防脑裂功能
<F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);
/**
* Stop the underlying rpc server of the provided self gateway.
*
* @param selfGateway Self gateway describing the underlying rpc server
*/
// 停止RPC服务
void stopServer(RpcServer selfGateway);
/**
* Trigger the asynchronous stopping of the @link RpcService.
*
* @return Future which is completed once the @link RpcService has been fully stopped.
*/
// 异步停止RPC服务
CompletableFuture<Void> stopService();
/**
* Returns a future indicating when the RPC service has been shut down.
*
* @return Termination future
*/
// 返回一个CompletableFuture,在RPC服务完全关闭之后调用
CompletableFuture<Void> getTerminationFuture();
/**
* Gets the executor, provided by this RPC service. This executor can be used for example for
* the @code handleAsync(...) or @code thenAcceptAsync(...) methods of futures.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
*
* @return The execution context provided by the RPC service
*/
// 获取RPC服务执行线程,可用于handleAsync等异步逻辑执行
Executor getExecutor();
/**
* Gets a scheduled executor from the RPC service. This executor can be used to schedule tasks
* to be executed in the future.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
*
* @return The RPC service provided scheduled executor
*/
// 获取定时任务线程池
ScheduledExecutor getScheduledExecutor();
/**
* Execute the runnable in the execution context of this RPC Service, as returned by @link
* #getExecutor(), after a scheduled delay.
*
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
*/
// 设置一个定时任务,在ScheduledExecutor中执行
ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
/**
* Execute the given runnable in the executor of the RPC service. This method can be used to run
* code outside of the main thread of a @link RpcEndpoint.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
*
* @param runnable to execute
*/
// 在RPC服务线程池中运行runnable
void execute(Runnable runnable);
/**
* Execute the given callable and return its result as a @link CompletableFuture. This method
* can be used to run code outside of the main thread of a @link RpcEndpoint.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
*
* @param callable to execute
* @param <T> is the return value type
* @return Future containing the callable's future result
*/
// 在RPC服务线程池异步运行callable任务,异步结果以CompletableFuture形式返回
<T> CompletableFuture<T> execute(Callable<T> callable);
3.3 AkkaRpcServiceUtils
一个负责创建AkkaRpcService的工具类。
我们继续第一节的创建RpcService的过程。查看AkkaRpcServiceUtils的createRemoteRpcService方法,如下所示:
public static AkkaRpcService createRemoteRpcService(
Configuration configuration,
@Nullable String externalAddress,
String externalPortRange,
@Nullable String bindAddress,
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort)
throws Exception
// 创建一个serviceBuilder
final AkkaRpcServiceBuilder akkaRpcServiceBuilder =
AkkaRpcServiceUtils.remoteServiceBuilder(
configuration, externalAddress, externalPortRange);
// 传入bind地址和bind端口号配置
if (bindAddress != null)
akkaRpcServiceBuilder.withBindAddress(bindAddress);
bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);
// 创建并启动RpcService
return akkaRpcServiceBuilder.createAndStart();
AkkaRpcService通过建造者模式构建,在给予akkaRpcServiceBuilder足够配置信息后,调用createAndStart方法创建出AkkaRpcService。
public AkkaRpcService createAndStart() throws Exception
// 获取线程池并行度配置
if (actorSystemExecutorConfiguration == null)
actorSystemExecutorConfiguration =
BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(
configuration);
final ActorSystem actorSystem;
// 如果没有配置外部访问地址,创建本地ActorSystem
if (externalAddress == null)
// create local actor system
actorSystem =
BootstrapTools.startLocalActorSystem(
configuration,
actorSystemName,
logger,
actorSystemExecutorConfiguration,
customConfig);
else
// 否则创建一个远程ActorSystem
// create remote actor system
actorSystem =
BootstrapTools.startRemoteActorSystem(
configuration,
actorSystemName,
externalAddress,
externalPortRange,
bindAddress,
Optional.ofNullable(bindPort),
logger,
actorSystemExecutorConfiguration,
customConfig);
// 返回AkkaRpcService实例
// 在后面章节分析
return new AkkaRpcService(
actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
紧接着我们分析ActorSystem的创建过程。
3.4 BootstrapTools
3.4.1 startLocalActorSystem
此方法创建一个用于本地调用的ActorSystem。
BootstrapTools的startLocalActorSystem方法内容如下:
public static ActorSystem startLocalActorSystem(
Configuration configuration,
String actorSystemName,
Logger logger,
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
Config customConfig)
throws Exception
logger.info("Trying to start local actor system");
try
// 获取Akka配置,externalAddress和bindAddress为空
// 对应的是一个本地的ActorSystem配置
Config akkaConfig =
AkkaUtils.getAkkaConfig(
configuration,
scala.Option.empty(),
scala.Option.empty(),
actorSystemExecutorConfiguration.getAkkaConfig());
// 如果有自定义配置,将基本配置和自定义配置拼装起来,重复的配置项基本配置优先
if (customConfig != null)
akkaConfig = customConfig.withFallback(akkaConfig);
// 启动ActorSystem
return startActorSystem(akkaConfig, actorSystemName, logger);
catch (Throwable t)
throw new Exception("Could not create actor system", t);
3.4.2 startRemoteActorSystem
创建一个可以远程调用的ActorSystem。
startRemoteActorSystem方法内容如下:
public static ActorSystem startRemoteActorSystem(
Configuration configuration,
String actorSystemName,
String externalAddress,
String externalPortRange,
String bindAddress,
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort,
Logger logger,
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
Config customConfig)
throws Exception
// parse port range definition and create port iterator
Iterator<Integer> portsIterator;
try
// 解析端口范围字符串,生成整数类型端口集合
portsIterator = NetUtils.getPortRangeFromString(externalPortRange);
catch (Exception e)
throw new IllegalArgumentException(
"Invalid port range definition: " + externalPortRange);
while (portsIterator.hasNext())
final int externalPort = portsIterator.next();
// 逐个尝试范围内的端口,直到启动ActorSystem成功
try
return startRemoteActorSystem(
configuration,
actorSystemName,
externalAddress,
externalPort,
bindAddress,
bindPort.orElse(externalPort),
logger,
actorSystemExecutorConfiguration,
customConfig);
catch (Exception e)
// we can continue to try if this contains a netty channel exception
Throwable cause = e.getCause();
if (!(cause instanceof org.jboss.netty.channel.ChannelException
|| cause instanceof java.net.BindException))
throw e;
// else fall through the loop and try the next port
// 如果执行到这一步,说明范围内所有端口都无法使用
// if we come here, we have exhausted the port range
throw new BindException(
"Could not start actor system on any port in port range " + externalPortRange);
注:
Flink1.11之后支持TaskManager JobManager本地和远程使用不同的地址和端口,从而支持Docker和NAT端口映射。具体内容参见Flink-15911和Flink-15154。
配置远程的监听地址和端口:
jobmanager.rpc.address
jobmanager.rpc.port
taskmanager.host
taskmanager.rpc.port
taskmanager.data.port
配置本地的监听地址和端口:
jobmanager.bind-host
jobmanager.rpc.bind-port
taskmanager.bind-host
taskmanager.rpc.bind-port
taskmanager.data.bind-port
上面的方法最后调用了重载方法。该方法内容和startLocalActorSystem类似。
private static ActorSystem startRemoteActorSystem(
Configuration configuration,
String actorSystemName,
String externalAddress,
int externalPort,
String bindAddress,
int bindPort,
Logger logger,
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
Config customConfig)
throws Exception
// 将地址和端口规范化后返回
String externalHostPortUrl =
NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);
String bindHostPortUrl =
NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);
logger.info(
"Trying to start actor system, external address , bind address .",
externalHostPortUrl,
bindHostPortUrl);
try
// 和startLocalActorSystem一样
// 多了传入externalAddress port,和bindAddress以及port
Config akkaConfig =
AkkaUtils.getAkkaConfig(
configuration,
new Some<>(new Tuple2<>(externalAddress, externalPort)),
new Some<>(new Tuple2<>(bindAddress, bindPort)),
actorSystemExecutorConfiguration.getAkkaConfig());
if (customConfig != null)
akkaConfig = customConfig.withFallback(akkaConfig);
return startActorSystem(akkaConfig, actorSystemName, logger);
catch (Throwable t)
if (t instanceof ChannelException)
Throwable cause = t.getCause();
if (cause != null && t.getCause() instanceof BindException)
throw new IOException(
"Unable to create ActorSystem at address "
+ bindHostPortUrl
+ " : "
+ cause.getMessage(),
t);
throw new Exception("Could not create actor system", t);
3. 5 startActorSystem
最后,通过AkkaUtils创建出ActorSystem。
private static ActorSystem startActorSystem(
Config akkaConfig, String actorSystemName, Logger logger)
logger.debug("Using akka configuration\\n ", akkaConfig);
ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
logger.info("Actor system started at ", AkkaUtils.getAddress(actorSystem));
return actorSystem;
3.6 AkkaUtils
AkkaUtils负责生成Akka配置和创建ActorSystem。这里我们主要关注它生成Akka配置的方法。
下面是getAkkaConfig方法,注意该类使用Scala编写。
@throws(classOf[UnknownHostException])
def getAkkaConfig(configuration: Configuration,
externalAddress: Option[(String, Int)],
bindAddress: Option[(String, Int)],
executorConfig: Config): Config =
// 获取Akka基本配置,以及executor配置
val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig)
// 根据bindHostname,bindPort,externalHostname,externalPort是否传入,生成不同的配置
externalAddress match
case Some((externalHostname, externalPort)) FlinkFLink 通讯组件 RPC
FlinkFlink The rpc invocation size %d exceeds the maximum akka framesize