flink rpc的应用示例
Posted 架构师前线
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink rpc的应用示例相关的知识,希望对你有一定的参考价值。
代码示例
flink 内部 RPC 框架的使用示例:
1package com.flink.rpc.remote.sample;
2
3import akka.actor.ActorSystem;
4import akka.actor.Terminated;
5import org.apache.flink.api.common.time.Time;
6import org.apache.flink.runtime.akka.AkkaUtils;
7import org.apache.flink.runtime.concurrent.FutureUtils;
8import org.apache.flink.runtime.rpc.RpcEndpoint;
9import org.apache.flink.runtime.rpc.RpcGateway;
10import org.apache.flink.runtime.rpc.RpcService;
11import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
12import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
13
14import java.util.Arrays;
15import java.util.concurrent.CompletableFuture;
16import java.util.concurrent.TimeUnit;
17
18public class TestFlinkRpc {
19
20 private static final Time TIMEOUT = Time.seconds(10L);
21 private static ActorSystem actorSystem = null;
22 private static RpcService rpcService = null;
23
24 // 定义2个通信协议Test0Gateway和Test1Gateway
25 public interface Test0Gateway extends RpcGateway {
26 String test0();
27 }
28
29 public interface Test1Gateway extends RpcGateway {
30 String test1();
31 }
32
33 // 具体实现2个远程服务Test0RpcEndpoint、Test1RpcEndpoint
34 public static class Test0RpcEndpoint extends RpcEndpoint implements Test0Gateway {
35 protected Test0RpcEndpoint(RpcService rpcService) {
36 super(rpcService);
37 }
38
39 @Override
40 public String test0() {
41 return "test0";
42 }
43 }
44
45 public static class Test1RpcEndpoint extends RpcEndpoint implements Test1Gateway {
46 protected Test1RpcEndpoint(RpcService rpcService) {
47 super(rpcService);
48 }
49
50 @Override
51 public String test1() {
52 return "test1";
53 }
54 }
55
56 public static void main(String[] args) throws Exception {
57 actorSystem = AkkaUtils.createDefaultActorSystem();
58 // 创建 RpcService, 基于 AKKA 的实现
59 rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
60
61 //将2个服务注册到rpcService中
62 Test0RpcEndpoint test0RpcEndpoint = new Test0RpcEndpoint(rpcService);
63 Test1RpcEndpoint test1RpcEndpoint = new Test1RpcEndpoint(rpcService);
64
65 test0RpcEndpoint.start();
66 //获取 endpoint 的 self gateway
67 Test0Gateway helloRpcServer = test0RpcEndpoint.getSelfGateway(Test0Gateway.class);
68 String test0 = helloRpcServer.test0();
69 System.out.println("test="+test0);
70
71 test1RpcEndpoint.start();
72 // 通过 endpoint 的地址获得代理
73 Test1Gateway test1Gateway = rpcService.connect(test1RpcEndpoint.getAddress(),Test1Gateway.class).get();
74 String test1 = test1Gateway.test1();
75 System.out.println("test="+test1);
76
77 final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
78 final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
79
80 FutureUtils
81 .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
82 .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
83 }
84}
基本流程:
1、定义*RpcGateway RPC接口协议,基于接口实现服务:RpcEndpoint,且将服务注册到RpcService中。
2、通过Endpoint的getSelfGateway,获得RpcEndpoint服务的代理对象,调用 RPC 方法。
该示例较能说明清楚flink rpc几个顶层类
RpcService,RpcEndpoint,RpcGateway,RpcServer的关系和作用。
以上是关于flink rpc的应用示例的主要内容,如果未能解决你的问题,请参考以下文章