FLinkFlink 源码阅读笔记- RPC
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLinkFlink 源码阅读笔记- RPC相关的知识,希望对你有一定的参考价值。
文章目录
1.概述
相关文章:
作为一个分布式系统,Flink 内部不同组件之间通信依赖于 RPC 机制。这篇文章将对 Flink 的 RPC 框架加以分析。
2.例子
先来看一个简单的例子,了解 Flink 内部的 RPC 框架是如何使用的。
public class RpcTest
private static final Time TIMEOUT = Time.seconds(10L);
private static ActorSystem actorSystem = null;
private static RpcService rpcService = null;
// 定义通信协议
public interface HelloGateway extends RpcGateway
String hello();
public interface HiGateway extends RpcGateway
String hi();
// 具体实现
public static class HelloRpcEndpoint extends RpcEndpoint implements HelloGateway
protected HelloRpcEndpoint(RpcService rpcService)
super(rpcService);
@Override
public String hello()
return "hello";
public static class HiRpcEndpoint extends RpcEndpoint implements HiGateway
protected HiRpcEndpoint(RpcService rpcService)
super(rpcService);
@Override
public String hi()
return "hi";
@BeforeClass
public static void setup()
actorSystem = AkkaUtils.createDefaultActorSystem();
// 创建 RpcService, 基于 AKKA 的实现
rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
@AfterClass
public static void teardown() throws Exception
final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
FutureUtils
.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
@Test
public void test() throws Exception
HelloRpcEndpoint helloEndpoint = new HelloRpcEndpoint(rpcService);
HiRpcEndpoint hiEndpoint = new HiRpcEndpoint(rpcService);
helloEndpoint.start();
//获取 endpoint 的 self gateway
HelloGateway helloGateway = helloEndpoint.getSelfGateway(HelloGateway.class);
String hello = helloGateway.hello();
assertEquals("hello", hello);
hiEndpoint.start();
// 通过 endpoint 的地址获得代理
HiGateway hiGateway = rpcService.connect(hiEndpoint.getAddress(),HiGateway.class).get();
String hi = hiGateway.hi();
assertEquals("hi", hi);
基本的使用流程就是1)定义协议,提供 RPC 方法的实现;2)获得服务对象的代理对象,调用 RPC 方法。
3.主要抽象
RpcEndpoint
是对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需要继承该抽象类
。另外,对于同一个 RpcEndpoint 的所有 RPC 调用都会在同一个线程(RpcEndpoint 的“主线程”)
中执行,因此无需担心并发执行的线程安全问题。
RpcGateway
接口是用于远程调用的代理接口
。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法。在实现一个提供 RPC 调用的组件时,通常需要先定一个接口,该接口继承 RpcGateway 并约定好提供的远程调用的方法。
RpcService
是 RpcEndpoint 的运行时环境
, RpcService
提供了启动 RpcEndpoint
, 连接到远端 RpcEndpoint
并返回远端 RpcEndpoint
的代理对象等方法。此外, RpcService
还提供了某些异步任务或者周期性调度任务
的方法。
RpcServer
相当于 RpcEndpoint
自身的的代理对象(self gateway)。RpcServer
是 RpcService
在启动了 RpcEndpoint
之后返回的对象,每一个 RpcEndpoint
对象内部都有一个 RpcServer
的成员变量,通过 getSelfGateway
方法就可以获得自身的代理,然后调用该Endpoint 提供的服务。
FencedRpcEndpoint
和 FencedRpcGate
要求在调用 RPC 方法时携带 token 信息,只有当调用方提供了 token 和 endpoint 的 token 一致时才允许调用。
4.基于 Akka 的 RPC 实现
前面介绍了 Flink 内部 RPC 框架的基本抽象,主要就是 RpcService, RpcEndpoint, RpcGateway, RpcServer 等接口。至于具体的实现,则可以有多种不同的方式,如 Akka, Netty 等。Flink 目前提供了一套基于 Akka 的实现。
4.1 启动 RpcEndpoint
AkkaRpcService
实现了 RpcService
接口, AkkaRpcService
会启动 Akka actor 来接收来自 RpcGateway
的 RPC 调用。
首先,在 RpcEndpoint
的构造函数中,会调用 AkkaRpcService#startServer
方法来初始化服务,AkkaRpcService#startServer
的主要工作包括: - 创建一个 Akka actor (AkkaRpcActor 或 FencedAkkaRpcActor)
- 通过动态代理创建代理对象
class AkkaRpcService
@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint)
checkNotNull(rpcEndpoint, "rpc endpoint");
CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;
if (rpcEndpoint instanceof FencedRpcEndpoint)
akkaRpcActorProps = Props.create(
FencedAkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
else
akkaRpcActorProps = Props.create(
AkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
ActorRef actorRef;
// 创建 Akka actor
synchronized (lock)
checkState(!stopped, "RpcService is stopped");
actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.put(actorRef, rpcEndpoint);
LOG.info("Starting RPC endpoint for at .", rpcEndpoint.getClass().getName(), actorRef.path());
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty())
hostname = "localhost";
else
hostname = host.get();
// 代理的接口
Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);
final InvocationHandler akkaInvocationHandler;
//创建 InvocationHandler
if (rpcEndpoint instanceof FencedRpcEndpoint)
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
else
akkaInvocationHandler = new AkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
//通过动态代理创建代理对象
@SuppressWarnings("unchecked")
RpcServer server = (RpcServer) Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);
return server;
在 RpcEndpoint
对象创建后,下一步操作是启动它,实际上调用的是 RpcServer.start()
方法。RpcServer
是通过 AkkaInvocationHandler
创建的动态代理对象:
class AkkaInvocationHandler
private final ActorRef rpcEndpoint;
public void start()
//向 Akka actor 发送 START 消息
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
所以启动 RpcEndpoint 实际上就是向当前 endpoint 绑定的 Actor 发送一条 START 消息,通知服务启动。
4.2 获取 RpcEndpoint 的代理对象
在 RpcEndpoint
创建的过程中,实际上已经通过动态代理生成了一个可供本地使用的代理对象,通过 RpcEndpoint#getSelfGateway
方法可以直接获取。
class RpcEndpoint
public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType)
//rpcServer 是通过动态代理创建的
if (selfGatewayType.isInstance(rpcServer))
@SuppressWarnings("unchecked")
C selfGateway = ((C) rpcServer);
return selfGateway;
else
throw new RuntimeException("RpcEndpoint does not implement the RpcGateway interface of type " + selfGatewayType + '.');
如果需要获取一个远程 RpcEndpoint
的代理,就需要通过 RpcService#connect
方法,需要提供远程 endpoint 的地址:
class AkkaRpcService
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
final String address,
final Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory)
checkState(!stopped, "RpcService is stopped");
LOG.debug("Try to connect to remote RPC endpoint with address . Returning a gateway.",
address, clazz.getName());
final ActorSelection actorSel = actorSystem.actorSelection(address);
final Future<ActorIdentity> identify = Patterns
.ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
//获取 actor 的引用 ActorRef
final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply(
(ActorIdentity actorIdentity) ->
if (actorIdentity.getRef() == null)
throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'));
else
return actorIdentity.getRef();
);
//发送握手消息
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
(ActorRef actorRef) -> FutureUtils.toJava(
Patterns
.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
// 创建 InvocationHandler,并通过动态代理生成代理对象
return actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) ->
InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
@SuppressWarnings("unchecked")
C proxy = (C) Proxy.newProxyInstance(
classLoader,
new Class<?>[]clazz,
invocationHandler);
return proxy;
,
actorSystem.dispatcher());
上述方法主要的功能包括:
- 通过地址获取
RpcEndpoint
绑定的 actor 的引用ActorRef
- 向对应的
AkkaRpcActor
发送握手消息 - 握手成功之后,创建
AkkaInvocationHandler
对象,并通过动态代理生成代理对象
4.3 Rpc 调用
在获取了本地或者远端 RpcEndpoint
的代理对象后,就可以通过代理对象发起 RPC 调用了。由于代理对象是通过动态代理创建的,因而所以的方法都会转化为 AkkaInvocationHandler#invoke
方法,并传入 RPC 调用的方法以及参数信息。
class AkkaInvocationHandler
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
Class<?> declaringClass = method.getDeclaringClass();
Object result;
if (declaringClass.equals(AkkaBasedEndpoint.class) ||
declaringClass.equals(Object.class) ||
declaringClass.equals(RpcGateway.class) ||
declaringClass.equals(StartStoppable.class) ||
declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(RpcServer.class))
result = method.invoke(this, args);
else if (declaringClass.equals(FencedRpcGateway.class))
throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
"retrieve a properly FencedRpcGateway.");
else
result = invokeRpc(method, args);
return result;
private Object invokeRpc(Method method, Object[] args) throws Exception
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
//将 RPC 调用封装为 RpcInvocation(会根据RpcEndpoint是本地还是远程的)
final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);
Class<?> returnType = method.getReturnType();
以上是关于FLinkFlink 源码阅读笔记- RPC的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink 源码阅读笔记(18)- Flink SQL 中的流和动态表
FlinkFlink 源码阅读笔记(16)- Flink SQL 的元数据管理
FlinkFlink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型