Netty实现一个简单的 RPC

Posted Java研发军团

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty实现一个简单的 RPC相关的知识,希望对你有一定的参考价值。

Netty实现一个简单的 RPC

链接:http://thinkinjava.cn/article/80



目录:


  1. 需求

  2. 设计

  3. 实现

    1. 创建 maven 项目,导入 Netty 4.1.16。

    2. 项目目录结构

    3. 设计接口

    4. 提供者相关实现

    5. 消费者相关实现

    6. 测试结果

  4. 总结

前言

众所周知,dubbo 底层使用了 Netty 作为网络通讯框架,而 Netty 的高性能我们之前也分析过源码,对他也算还是比较了解了。今天我们就自己用 Netty 实现一个简单的 RPC 框架。


1. 需求

模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.16。


2. 设计
  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据。


3. 实现

1. 创建 maven 项目,导入 Netty 4.1.16。


<groupId>cn.thinkinjava</groupId>
 <artifactId>rpc-demo</artifactId>
 <version>1.0-SNAPSHOT</version>

 <dependencies>
   <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.16.Final</version>
   </dependency>



2. 项目目录结构如下:


Netty实现一个简单的 RPC


3. 设计接口


一个简单的 hello world:

public interface HelloService {  String hello(String msg);
}


4. 提供者相关实现

4.1. 首先实现约定接口,用于返回客户端数据:

/**
* 实现类
*/
public class HelloServiceImpl implements HelloService {  public String hello(String msg) {    return msg != null ? msg + " -----> I am fine." : "I am fine.";
 }
}


4.2. 实现 Netty 服务端和自定义 handler

启动 Netty Server 代码:

private static void startServer0(String hostName, int port) {    try {
     ServerBootstrap bootstrap = new ServerBootstrap();
     NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
     bootstrap.group(eventLoopGroup)
         .channel(NioserverSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {            @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             p.addLast(new StringDecoder());
             p.addLast(new StringEncoder());
             p.addLast(new HelloServerHandler());
           }
         });
     bootstrap.bind(hostName, port).sync();
   } catch (InterruptedException e) {
     e.printStackTrace();
   }
 }


上面的代码中添加了 String类型的编解码 handler,添加了一个自定义 handler。


自定义 handler 逻辑如下:

/**
* 用于处理请求数据
*/
public class HelloServerHandler extends ChannelInboundHandlerAdapter {  @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {    // 如何符合约定,则调用本地方法,返回数据
   if (msg.toString().startsWith(ClientBootstrap.providerName)) {
     String result = new HelloServiceImpl()
         .hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
     ctx.writeAndFlush(result);
   }
 }
}


这里显示判断了是否符合约定(并没有使用复杂的协议,只是一个字符串判断),然后创建一个具体实现类,并调用方法写回客户端。


还需要一个启动类:

public class ServerBootstrap {  public static void main(String[] args) {
   NettyServer.startServer("localhost", 8088);
 }
}

好,关于提供者的代码就写完了,主要就是创建一个 netty 服务端,实现一个自定义的 handler,自定义 handler 判断是否符合之间的约定(算是协议吧),如果符合,就创建一个接口的实现类,并调用他的方法返回字符串。


5. 消费者相关实现


消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK 的动态代理来实现这个目的。


思路:客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。


我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。


5.1. 首先创建代理相关的类

public class RpcConsumer {  private static ExecutorService executor = Executors
     .newFixedThreadPool(Runtime.getRuntime().availableProcessors());  private static HelloClientHandler client;  /**
  * 创建一个代理对象
  */

 public Object createProxy(final Class<?> serviceClass,      final String providerName) {    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),        new Class<?>[]{serviceClass}, (proxy, method, args) -> {          if (client == null) {
           initClient();
         }          // 设置参数
         client.setPara(providerName + args[0]);          return executor.submit(client).get();
       });
 }  /**
  * 初始化客户端
  */

 private static void initClient() {
   client = new HelloClientHandler();
   EventLoopGroup group = new NioEventLoopGroup();
   Bootstrap b = new Bootstrap();
   b.group(group)
       .channel(NioSocketChannel.class)
       .option(ChannelOption.TCP_NODELAY, true)
       .handler(new ChannelInitializer<SocketChannel>() {          @Override
         public void initChannel(SocketChannel ch) throws Exception
{
           ChannelPipeline p = ch.pipeline();
           p.addLast(new StringDecoder());
           p.addLast(new StringEncoder());
           p.addLast(client);
         }
       });    try {
     b.connect("localhost", 8088).sync();
   } catch (InterruptedException e) {
     e.printStackTrace();
   }
 }
}

该类有 2 个方法,创建代理和初始化客户端。


初始化客户端逻辑: 创建一个 Netty 的客户端,并连接提供者,并设置一个自定义 handler,和一些 String 类型的编解码器。


创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下: 如果 client 没有初始化,则初始化 client,这个 client 既是 handler ,也是一个 Callback。将参数设置进 client ,使用线程池调用 client 的 call 方法并阻塞等待数据返回。


看看 HelloClientHandler 的实现:

public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable {  private ChannelHandlerContext context;  private String result;  private String para;  @Override
 public void channelActive(ChannelHandlerContext ctx) {
   context = ctx;
 }  /**
  * 收到服务端数据,唤醒等待线程
  */

 @Override
 public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
   result = msg.toString();
   notify();
 }  /**
  * 写出数据,开始等待唤醒
  */

 @Override
 public synchronized Object call() throws InterruptedException {
   context.writeAndFlush(para);
   wait();    return result;
 }  void setPara(String para) {    this.para = para;
 }
}

该类缓存了 ChannelHandlerContext,用于下次使用,有两个属性:返回结果和请求参数。


当成功连接后,缓存 ChannelHandlerContext,当调用 call 方法的时候,将请求参数发送到服务端,等待。当服务端收到并返回数据后,调用 channelRead 方法,将返回值赋值个 result,并唤醒等待在 call 方法上的线程。此时,代理对象返回数据。


再看看设计的测试类:

public class ClientBootstrap {  public static final String providerName = "HelloService#hello#";  public static void main(String[] args) throws InterruptedException {

   RpcConsumer consumer = new RpcConsumer();    // 创建一个代理对象
   HelloService service = (HelloService) consumer
       .createProxy(HelloService.class, providerName);    for (; ; ) {
     Thread.sleep(1000);
     System.out.println(service.hello("are you ok ?"));
   }
 }
}

测试类首先创建了一个代理对象,然后每隔一秒钟调用代理的 hello 方法,并打印服务端返回的结果。


测试结果

Netty实现一个简单的 RPC

成功打印。

总结

看了这么久的 Netty 源码,我们终于实现了一个自己的 Netty 应用,虽然这个应用很简单,甚至代码写的有些粗糙,但功能还是实现了,RPC 的目的就是允许像调用本地服务一样调用远程服务,需要对使用者透明,于是我们使用了动态代理。并使用 Netty 的 handler 发送数据和响应数据,完成了一次简单的 RPC 调用。


当然,还是那句话,代码比较简单,主要是思路,以及了解 RPC 底层的实现。



更多高质量视频资源请查看《



Netty实现一个简单的 RPC


下面到了发资源的时刻“Oracle视频教程”加小编吧

老铁们应该都知道在哪里获取了

课程目录

1-Oracle数据库的安装和配置.wmv

2-Oracle数据库管理.wmv

3-sql-第1节_基本的SQL-SELECT语句.wmv

4-sql-第1节课后练习.wmv

5-sql-第2节-过滤和排序数据.wmv

6-sql-第2节课后练习.wmv

7-sql-第3节_单行函数1.wmv

8-sql-第3节_单行函数2.wmv

9-sql-第3节课后练习.wmv

10-sql-第4节_多表查询.wmv

11-sql-第4节课后练习.wmv

12-sql-第5节_分组函数.wmv

13-sql-第5节课后练习.wmv

14-sql-第6节_子查询.wmv

15-sql-第6节课后练习1.wmv

16-sql-第6节课后练习2.wmv

17-sql-第7节_创建和管理表.wmv

18-sql-第7节课后练习.wmv

19-sql-第8节_数据处理1.wmv

20-sql-第8节_数据处理2.wmv

21-sql-第8节课后练习.wmv

22-sql-第9节_约束1.wmv 

23-sql-第9节_约束2.wmv

24-sql-第9节课后练习.wmv

25-sql-第10节_视图.wmv

26-sql-第10节课后练习.wmv

27-sql-第11节_其它数据库对象.wmv

28-sql-第11节课后练习.wmv

29-sql-第12节_控制用户权限及练习.wmv

30-sql-第13节_SET运算符及练习.wmv

31-sql-第14节_高级子查询1.wmv

32-sql-第14节_高级子查询2.wmv

33-sql-第14节课后练习.wmv

34-plsql(轻量版)_基本语法.wmv

35-plsql(轻量版)_记录类型1.wmv

36-plsql(轻量版)_复习_记录类型2.wmv

37-plsql(轻量版)_流程控制.wmv

38-plsql(轻量版)_游标的使用1.wmv

39-plsql(轻量版)_游标的使用2.wmv

40-plsql(轻量版)_异常处理机制.wmv

41-plsql(轻量版)-存储函数&存储过程.wmv

42-plsql(轻量版)_触发器.wmv

oracle_sql_plsql课件_章节练习_资料.zip


是不是很棒,分享一下吧!!!


END


热门阅读:

1. 

2. 

3. 

4. 

    5.


扫码关注   有捣乱者绕道
长按,识别二维码,加关注

不存在任何培训机构招生信息!!

以上是关于Netty实现一个简单的 RPC的主要内容,如果未能解决你的问题,请参考以下文章

13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)

13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

十三.Netty入门到超神系列-手撸简单版RPC框架(仿Dubbo)

netty实现rpc框架

Java进阶:Netty实现RPC的代码