flink rpc的应用示例

Posted 架构师前线

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink rpc的应用示例相关的知识,希望对你有一定的参考价值。

代码示例

flink 内部 RPC 框架的使用示例:

 1package com.flink.rpc.remote.sample;
2
3import akka.actor.ActorSystem;
4import akka.actor.Terminated;
5import org.apache.flink.api.common.time.Time;
6import org.apache.flink.runtime.akka.AkkaUtils;
7import org.apache.flink.runtime.concurrent.FutureUtils;
8import org.apache.flink.runtime.rpc.RpcEndpoint;
9import org.apache.flink.runtime.rpc.RpcGateway;
10import org.apache.flink.runtime.rpc.RpcService;
11import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
12import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
13
14import java.util.Arrays;
15import java.util.concurrent.CompletableFuture;
16import java.util.concurrent.TimeUnit;
17
18public class TestFlinkRpc {
19
20    private static final Time TIMEOUT = Time.seconds(10L);
21    private static ActorSystem actorSystem = null;
22    private static RpcService rpcService = null;
23
24    // 定义2个通信协议Test0Gateway和Test1Gateway
25    public interface Test0Gateway extends RpcGateway {
26        String test0();
27    }
28
29    public interface Test1Gateway extends RpcGateway {
30        String test1();
31    }
32
33    // 具体实现2个远程服务Test0RpcEndpoint、Test1RpcEndpoint
34    public static class Test0RpcEndpoint extends RpcEndpoint implements Test0Gateway {
35        protected Test0RpcEndpoint(RpcService rpcService) {
36            super(rpcService);
37        }
38
39        @Override
40        public String test0() {
41            return "test0";
42        }
43    }
44
45    public static class Test1RpcEndpoint extends RpcEndpoint implements Test1Gateway {
46        protected Test1RpcEndpoint(RpcService rpcService) {
47            super(rpcService);
48        }
49
50        @Override
51        public String test1() {
52            return "test1";
53        }
54    }
55
56    public static void main(String[] args) throws Exception {
57        actorSystem = AkkaUtils.createDefaultActorSystem();
58        // 创建 RpcService, 基于 AKKA 的实现
59        rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
60
61        //将2个服务注册到rpcService中
62        Test0RpcEndpoint test0RpcEndpoint = new Test0RpcEndpoint(rpcService);
63        Test1RpcEndpoint test1RpcEndpoint = new Test1RpcEndpoint(rpcService);
64
65        test0RpcEndpoint.start();
66        //获取 endpoint 的 self gateway
67        Test0Gateway helloRpcServer = test0RpcEndpoint.getSelfGateway(Test0Gateway.class);
68        String test0 = helloRpcServer.test0();
69        System.out.println("test="+test0);
70
71        test1RpcEndpoint.start();
72        // 通过 endpoint 的地址获得代理
73        Test1Gateway test1Gateway = rpcService.connect(test1RpcEndpoint.getAddress(),Test1Gateway.class).get();
74        String test1 = test1Gateway.test1();
75        System.out.println("test="+test1);
76
77        final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
78        final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
79
80        FutureUtils
81                .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
82                .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
83    }
84}

基本流程:
1、定义*RpcGateway RPC接口协议,基于接口实现服务:RpcEndpoint,且将服务注册到RpcService中。
2、通过Endpoint的getSelfGateway,获得RpcEndpoint服务的代理对象,调用 RPC 方法。

该示例较能说明清楚flink rpc几个顶层类
RpcService,RpcEndpoint,RpcGateway,RpcServer的关系和作用。

以上是关于flink rpc的应用示例的主要内容,如果未能解决你的问题,请参考以下文章

详解Flink组件通信——RPC协议

Flink内核原理学习组件通信RPC

Flink内核原理学习组件通信RPC

FLinkFlink 源码阅读笔记- RPC

Flink BLOB架构

GAE Go Json-RPC 调用示例