FlinkFlink 源码之RPC调用

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之RPC调用相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 源码之RPC调用

2.简介

本篇为大家分析Flink底层RPC调用的执行过程和原理。Flink RPC使用了Akka Actor框架。建议阅读前需要Akka Actor的基础知识。

Flink为了简化Actor的使用,对Actor做了一系列封装。定义了如下几个重要的接口或类:

  • RpcService:定义了RPC服务端(调用和被调用端)的一系列行为,远程调用需要指定地址和端口号。包含服务的启停,和连接其他远程的RpcService。

  • AkkaRpcService:RpcService的实现类。
    RpcServer:RPC服务,它自己也是个RpcGateway。

  • AkkaInvocationHandler:RpcServer的实现类。RpcService连接其他远程RpcService返回的实际上是一个代理类。这个代理类的真实执行逻辑位于AkkaInvocationHandler。

  • RpcGateway:RPC网关,调用其他RpcService的入口。一个RpcService连接其他远程RpcService返回的对象是RpcGateway类型。通常RpcGateway还实现了其他接口,这个接口和远端被调用的RpcService实现的接口相同。上面说过,RpcService连接其他远程RpcService返回的实际上是一个代理类,我们调用这个代理类的时候,底层会通过Akka调用远程RpcService的同名方法。

  • RpcEndpoint:RPC被调用端,需要持有一个RpcService,并且实现自己的业务逻辑接口,以供RpcGateway远程调用的时候执行。


Flink封装后的Akka使用起来非常简单。我们可以参考Flink单元测试RemoteAkkaRpcActorTest的canRespondWithSerializedValueRemotely方法,它通过remoteGateway远程调用AkkaRpcActorTest.SerializedValueRespondingEndpoint的getSerializedValueSynchronously方法。代码如下所示:

@Test
public void canRespondWithSerializedValueRemotely() throws Exception 
    try (final AkkaRpcActorTest.SerializedValueRespondingEndpoint endpoint =
            new AkkaRpcActorTest.SerializedValueRespondingEndpoint(rpcService)) 
        endpoint.start();

        final AkkaRpcActorTest.SerializedValueRespondingGateway remoteGateway =
                otherRpcService
                        .connect(
                                endpoint.getAddress(),
                                AkkaRpcActorTest.SerializedValueRespondingGateway.class)
                        .join();

        assertThat(
                remoteGateway.getSerializedValueSynchronously(),
                equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));

        final CompletableFuture<SerializedValue<String>> responseFuture =
                remoteGateway.getSerializedValue();

        assertThat(
                responseFuture.get(),
                equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
    

单元测试中如何创建RpcService的方法没有贴出。

本篇从TaskManagerRunner创建RpcService开始,分析Flink封装Akka Actor的方法和整个调用流程。

3.RpcService

3.1 TaskManager创建RpcService

TaskManager的createRpcService方法根据Flink的配置文件和高可用服务,创建出RpcService。

TaskManagerRunner的createRpcService方法如下:

@VisibleForTesting
static RpcService createRpcService(
        final Configuration configuration, final HighAvailabilityServices haServices)
        throws Exception 

    checkNotNull(configuration);
    checkNotNull(haServices);

    return AkkaRpcServiceUtils.createRemoteRpcService(
            configuration,
            determineTaskManagerBindAddress(configuration, haServices),
            configuration.getString(TaskManagerOptions.RPC_PORT),
            configuration.getString(TaskManagerOptions.BIND_HOST),
            configuration.getOptional(TaskManagerOptions.RPC_BIND_PORT));

创建RpcService的逻辑在AkkaRpcServiceUtils工具类中。接下来我们谈一下RpcService和它的创建过程。

3.2 RpcService定义

负责使用Akka Actor执行RPC。拥有一个子类AkkaRpcService。一个进程拥有一个RpcService。用于统筹RPC调用服务,连接到本地Endpoint启动服务,供远端调用(RpcEndpoint是被调用端),或者连接到远程RPC服务,创建出一个RpcGateway(调用端),从而可以调用远端。

RpcService接口代码如下所示:

public interface RpcService 

    /**
     * Return the hostname or host address under which the rpc service can be reached. If the rpc
     * service cannot be contacted remotely, then it will return an empty string.
     *
     * @return Address of the rpc service or empty string if local rpc service
     */
    // 获取RPC服务的地址
    // 如果是本地RPC服务的话,返回空
    String getAddress();

    /**
     * Return the port under which the rpc service is reachable. If the rpc service cannot be
     * contacted remotely, then it will return -1.
     *
     * @return Port of the rpc service or -1 if local rpc service
     */
    // 返回RPC服务端口号
    // 如果是本地RPC服务,返回-1
    int getPort();

    /**
     * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can be
     * used to communicate with the rpc server. If the connection failed, then the returned future
     * is failed with a @link RpcConnectionException.
     *
     * @param address Address of the remote rpc server
     * @param clazz Class of the rpc gateway to return
     * @param <C> Type of the rpc gateway to return
     * @return Future containing the rpc gateway or an @link RpcConnectionException if the
     *     connection attempt failed
     */
    // 根据提供的地址,连接到远程RPC服务
    // 返回C类型RPC网关,用于和远端通信
    <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);

    /**
     * Connect to a remote fenced rpc server under the provided address. Returns a fenced rpc
     * gateway which can be used to communicate with the rpc server. If the connection failed, then
     * the returned future is failed with a @link RpcConnectionException.
     *
     * @param address Address of the remote rpc server
     * @param fencingToken Fencing token to be used when communicating with the server
     * @param clazz Class of the rpc gateway to return
     * @param <F> Type of the fencing token
     * @param <C> Type of the rpc gateway to return
     * @return Future containing the fenced rpc gateway or an @link RpcConnectionException if the
     *     connection attempt failed
     */
    // 创建一个具有Fence功能的RPC网关
    // Fence是防止脑裂的机制,我们在AkkaRpcActor分析Fencing机制原理
    <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
            String address, F fencingToken, Class<C> clazz);

    /**
     * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
     *
     * @param rpcEndpoint Rpc protocol to dispatch the rpcs to
     * @param <C> Type of the rpc endpoint
     * @return Self gateway to dispatch remote procedure calls to oneself
     */
    // 启动RPC服务,将接收到的远程请求发送给rpcEndpoint处理
    <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);

    /**
     * Fence the given RpcServer with the given fencing token.
     *
     * <p>Fencing the RpcServer means that we fix the fencing token to the provided value. All RPCs
     * will then be enriched with this fencing token. This expects that the receiving RPC endpoint
     * extends @link FencedRpcEndpoint.
     *
     * @param rpcServer to fence with the given fencing token
     * @param fencingToken to fence the RpcServer with
     * @param <F> type of the fencing token
     * @return Fenced RpcServer
     */
    // 和上面的方法一样,只不过启用了防脑裂功能
    <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);

    /**
     * Stop the underlying rpc server of the provided self gateway.
     *
     * @param selfGateway Self gateway describing the underlying rpc server
     */
    // 停止RPC服务
    void stopServer(RpcServer selfGateway);

    /**
     * Trigger the asynchronous stopping of the @link RpcService.
     *
     * @return Future which is completed once the @link RpcService has been fully stopped.
     */
    // 异步停止RPC服务
    CompletableFuture<Void> stopService();

    /**
     * Returns a future indicating when the RPC service has been shut down.
     *
     * @return Termination future
     */
    // 返回一个CompletableFuture,在RPC服务完全关闭之后调用
    CompletableFuture<Void> getTerminationFuture();

    /**
     * Gets the executor, provided by this RPC service. This executor can be used for example for
     * the @code handleAsync(...) or @code thenAcceptAsync(...) methods of futures.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
     *
     * @return The execution context provided by the RPC service
     */
    // 获取RPC服务执行线程,可用于handleAsync等异步逻辑执行
    Executor getExecutor();

    /**
     * Gets a scheduled executor from the RPC service. This executor can be used to schedule tasks
     * to be executed in the future.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
     *
     * @return The RPC service provided scheduled executor
     */
    // 获取定时任务线程池
    ScheduledExecutor getScheduledExecutor();

    /**
     * Execute the runnable in the execution context of this RPC Service, as returned by @link
     * #getExecutor(), after a scheduled delay.
     *
     * @param runnable Runnable to be executed
     * @param delay The delay after which the runnable will be executed
     */
    // 设置一个定时任务,在ScheduledExecutor中执行
    ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);

    /**
     * Execute the given runnable in the executor of the RPC service. This method can be used to run
     * code outside of the main thread of a @link RpcEndpoint.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
     *
     * @param runnable to execute
     */
    // 在RPC服务线程池中运行runnable
    void execute(Runnable runnable);

    /**
     * Execute the given callable and return its result as a @link CompletableFuture. This method
     * can be used to run code outside of the main thread of a @link RpcEndpoint.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an @link RpcEndpoint. For such operations, one needs to use the @link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext of that @code RpcEndpoint.
     *
     * @param callable to execute
     * @param <T> is the return value type
     * @return Future containing the callable's future result
     */
    // 在RPC服务线程池异步运行callable任务,异步结果以CompletableFuture形式返回
    <T> CompletableFuture<T> execute(Callable<T> callable);

3.3 AkkaRpcServiceUtils

一个负责创建AkkaRpcService的工具类。

我们继续第一节的创建RpcService的过程。查看AkkaRpcServiceUtils的createRemoteRpcService方法,如下所示:

public static AkkaRpcService createRemoteRpcService(
        Configuration configuration,
        @Nullable String externalAddress,
        String externalPortRange,
        @Nullable String bindAddress,
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort)
        throws Exception 
    // 创建一个serviceBuilder
    final AkkaRpcServiceBuilder akkaRpcServiceBuilder =
            AkkaRpcServiceUtils.remoteServiceBuilder(
                    configuration, externalAddress, externalPortRange);

    // 传入bind地址和bind端口号配置
    if (bindAddress != null) 
        akkaRpcServiceBuilder.withBindAddress(bindAddress);
    

    bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);

    // 创建并启动RpcService
    return akkaRpcServiceBuilder.createAndStart();

AkkaRpcService通过建造者模式构建,在给予akkaRpcServiceBuilder足够配置信息后,调用createAndStart方法创建出AkkaRpcService。

public AkkaRpcService createAndStart() throws Exception 
    // 获取线程池并行度配置
    if (actorSystemExecutorConfiguration == null) 
        actorSystemExecutorConfiguration =
                BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(
                        configuration);
    

    final ActorSystem actorSystem;

    // 如果没有配置外部访问地址,创建本地ActorSystem
    if (externalAddress == null) 
        // create local actor system
        actorSystem =
                BootstrapTools.startLocalActorSystem(
                        configuration,
                        actorSystemName,
                        logger,
                        actorSystemExecutorConfiguration,
                        customConfig);
     else 
        // 否则创建一个远程ActorSystem
        // create remote actor system
        actorSystem =
                BootstrapTools.startRemoteActorSystem(
                        configuration,
                        actorSystemName,
                        externalAddress,
                        externalPortRange,
                        bindAddress,
                        Optional.ofNullable(bindPort),
                        logger,
                        actorSystemExecutorConfiguration,
                        customConfig);
    

    // 返回AkkaRpcService实例
    // 在后面章节分析
    return new AkkaRpcService(
            actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));

紧接着我们分析ActorSystem的创建过程。

3.4 BootstrapTools

3.4.1 startLocalActorSystem

此方法创建一个用于本地调用的ActorSystem。

BootstrapTools的startLocalActorSystem方法内容如下:

public static ActorSystem startLocalActorSystem(
        Configuration configuration,
        String actorSystemName,
        Logger logger,
        ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
        Config customConfig)
        throws Exception 

    logger.info("Trying to start local actor system");

    try 
        // 获取Akka配置,externalAddress和bindAddress为空
        // 对应的是一个本地的ActorSystem配置
        Config akkaConfig =
                AkkaUtils.getAkkaConfig(
                        configuration,
                        scala.Option.empty(),
                        scala.Option.empty(),
                        actorSystemExecutorConfiguration.getAkkaConfig());

        // 如果有自定义配置,将基本配置和自定义配置拼装起来,重复的配置项基本配置优先
        if (customConfig != null) 
            akkaConfig = customConfig.withFallback(akkaConfig);
        

        // 启动ActorSystem
        return startActorSystem(akkaConfig, actorSystemName, logger);
     catch (Throwable t) 
        throw new Exception("Could not create actor system", t);
    

3.4.2 startRemoteActorSystem

创建一个可以远程调用的ActorSystem。

startRemoteActorSystem方法内容如下:

public static ActorSystem startRemoteActorSystem(
        Configuration configuration,
        String actorSystemName,
        String externalAddress,
        String externalPortRange,
        String bindAddress,
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort,
        Logger logger,
        ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
        Config customConfig)
        throws Exception 

    // parse port range definition and create port iterator
    Iterator<Integer> portsIterator;
    try 
        // 解析端口范围字符串,生成整数类型端口集合
        portsIterator = NetUtils.getPortRangeFromString(externalPortRange);
     catch (Exception e) 
        throw new IllegalArgumentException(
                "Invalid port range definition: " + externalPortRange);
    

    while (portsIterator.hasNext()) 
        final int externalPort = portsIterator.next();

        // 逐个尝试范围内的端口,直到启动ActorSystem成功
        try 
            return startRemoteActorSystem(
                    configuration,
                    actorSystemName,
                    externalAddress,
                    externalPort,
                    bindAddress,
                    bindPort.orElse(externalPort),
                    logger,
                    actorSystemExecutorConfiguration,
                    customConfig);
         catch (Exception e) 
            // we can continue to try if this contains a netty channel exception
            Throwable cause = e.getCause();
            if (!(cause instanceof org.jboss.netty.channel.ChannelException
                    || cause instanceof java.net.BindException)) 
                throw e;
             // else fall through the loop and try the next port
        
    

    // 如果执行到这一步,说明范围内所有端口都无法使用
    // if we come here, we have exhausted the port range
    throw new BindException(
            "Could not start actor system on any port in port range " + externalPortRange);

注:
Flink1.11之后支持TaskManager JobManager本地和远程使用不同的地址和端口,从而支持Docker和NAT端口映射。具体内容参见Flink-15911和Flink-15154。

配置远程的监听地址和端口:

jobmanager.rpc.address
jobmanager.rpc.port
taskmanager.host
taskmanager.rpc.port
taskmanager.data.port

配置本地的监听地址和端口:

jobmanager.bind-host
jobmanager.rpc.bind-port
taskmanager.bind-host
taskmanager.rpc.bind-port
taskmanager.data.bind-port

上面的方法最后调用了重载方法。该方法内容和startLocalActorSystem类似。

private static ActorSystem startRemoteActorSystem(
    Configuration configuration,
    String actorSystemName,
    String externalAddress,
    int externalPort,
    String bindAddress,
    int bindPort,
    Logger logger,
    ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
    Config customConfig)
    throws Exception 

    // 将地址和端口规范化后返回
    String externalHostPortUrl =
        NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);
    String bindHostPortUrl =
        NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);
    logger.info(
        "Trying to start actor system, external address , bind address .",
        externalHostPortUrl,
        bindHostPortUrl);

    try 
        // 和startLocalActorSystem一样
        // 多了传入externalAddress port,和bindAddress以及port
        Config akkaConfig =
            AkkaUtils.getAkkaConfig(
            configuration,
            new Some<>(new Tuple2<>(externalAddress, externalPort)),
            new Some<>(new Tuple2<>(bindAddress, bindPort)),
            actorSystemExecutorConfiguration.getAkkaConfig());

        if (customConfig != null) 
            akkaConfig = customConfig.withFallback(akkaConfig);
        

        return startActorSystem(akkaConfig, actorSystemName, logger);
     catch (Throwable t) 
        if (t instanceof ChannelException) 
            Throwable cause = t.getCause();
            if (cause != null && t.getCause() instanceof BindException) 
                throw new IOException(
                    "Unable to create ActorSystem at address "
                    + bindHostPortUrl
                    + " : "
                    + cause.getMessage(),
                    t);
            
        
        throw new Exception("Could not create actor system", t);
    

3. 5 startActorSystem

最后,通过AkkaUtils创建出ActorSystem。

private static ActorSystem startActorSystem(
    Config akkaConfig, String actorSystemName, Logger logger) 
    logger.debug("Using akka configuration\\n ", akkaConfig);
    ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);

    logger.info("Actor system started at ", AkkaUtils.getAddress(actorSystem));
    return actorSystem;

3.6 AkkaUtils

AkkaUtils负责生成Akka配置和创建ActorSystem。这里我们主要关注它生成Akka配置的方法。

下面是getAkkaConfig方法,注意该类使用Scala编写。

@throws(classOf[UnknownHostException])
def getAkkaConfig(configuration: Configuration,
                  externalAddress: Option[(String, Int)],
                  bindAddress: Option[(String, Int)],
                  executorConfig: Config): Config = 
    // 获取Akka基本配置,以及executor配置
    val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig)

    // 根据bindHostname,bindPort,externalHostname,externalPort是否传入,生成不同的配置
    externalAddress match 

        case Some((externalHostname, externalPort)) FlinkFLink 通讯组件 RPC

FlinkFlink 源码之时间处理

FlinkFlink 源码之快照

FlinkFlink The rpc invocation size %d exceeds the maximum akka framesize

FlinkFlink 源码之AsyncFunction异步 IO 源码

FlinkFlink 源码之ExecutionGraph