Day480.Netty手写dubboRPC框架 -netty

Posted 阿昌喜欢吃黄桃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day480.Netty手写dubboRPC框架 -netty相关的知识,希望对你有一定的参考价值。

Netty手写dubboRPC框架

一、RPC 基本介绍

rpc是远程调用的一种行为,在数据传输过程中涉及到传输协议,http就是一种传输协议

RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。

  • 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程

  • 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

  • 常见的 RPC 框架有:
    • 阿里的Dubbo
    • google的gRPC
    • Go语言的rpc
    • Apache的thrift
    • Spring旗下的 Spring Cloud。

二、 RPC 调用流程

说明

  1. 服务消费方(client)以本地调用方式调用服务
  2. 【client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码】
  9. 服务消费方(client)得到结果

RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用


三、实现 dubbo RPC(基于 Netty)

1、需求说明

  1. dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架

  2. 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

2、设计说明

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据


3、代码实现

  • 项目结构

  • com.achang.netty.dubboRPC.consumer.ClientBootstrap

客户端启动器

public class ClientBootstrap 
    //定义协议头
    public static final String providerName = "HelloService#hello";
    
    public static void main(String[] args) throws InterruptedException 
        NettyClient client = new NettyClient();
        HelloService serviceProxy = (HelloService) client.getBean(HelloService.class, providerName);//拿到代理对象
        //        for (; ; ) 
        //调用客户端的方法
        //            Thread.sleep(2000);
        String result = serviceProxy.hello("阿昌来也");
        System.out.println("客户端调用服务端,结果为:" + result);
        //        
    

  • com.achang.netty.dubboRPC.provider.ServerBootstrap

服务端启动器

public class ServerBootstrap 
    public static void main(String[] args) throws InterruptedException 
        NettyServer.startServer("127.0.0.1",7000);
    

  • com.achang.netty.dubboRPC.netty.NettyClient

客户端初始化类

/******
 @author 阿昌
 @create 2021-12-17 21:32
 ******* Netty客户端
 */
public class NettyClient 
    //创建线程池
    private static ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static NettyClientHandler nettyClientHandler;

    /**
     * 编写方式使用代理模式,获取一个代理对象
     * @param serviceClass service类
     * @param providerName 协议头
     * @return 代理对象
     */
    public Object getBean(final Class<?> serviceClass,final String providerName)
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                                      new Class<?>[]serviceClass,
                                      ((proxy, method, args) -> 
                                          //客户端每调用一次就会进入该代码块
                                          //第一次调用
                                          if (nettyClientHandler==null)
                                              startClient0("127.0.0.1",7000);
                                          

                                          //设置要发送给服务器的信息
                                          //providerName协议头,args传入的参数
                                          nettyClientHandler.setParam(providerName+args[0]);
                                          return executors.submit(nettyClientHandler).get();
                                      
                                      ));
    

    //初始化客户端
    private static void startClient0(String ipaddr,Integer port)
        nettyClientHandler = new NettyClientHandler();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            Bootstrap bootstrap = new Bootstrap();
            Bootstrap clientBootstrap = bootstrap.group(workerGroup)
                .channel(NiosocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception 
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(nettyClientHandler);
                    
                );
            clientBootstrap.connect(ipaddr,port).sync();
        catch (InterruptedException e) 
            e.printStackTrace();
        
    


  • com.achang.netty.dubboRPC.netty.NettyClientHandler

客户端处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable 
    private ChannelHandlerContext channelHandlerContext;//上下文
    private String result;//调用的返回结果
    private String param;//客户端调用方法时的参数

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    

    //收到服务器的数据后就会被调用
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        System.out.println("channelRead");
        result = msg.toString();
        notify();//唤醒等待的线程
    

    //与服务器连接成功后就会被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        System.out.println("channelActive");
        channelHandlerContext = ctx;
    

    //被代理对象调用,异步发送数据给服务器,然后阻塞,会等待被唤醒
    @Override
    public synchronized Object call() throws Exception 
        System.out.println("call1");
        channelHandlerContext.writeAndFlush(param);
        //进行wait阻塞
        wait();
        System.out.println("call2");

        return result;
    


    //设置发送的数据
    void setParam(String msg)
        System.out.println("setParam");
        this.param = msg;
    
    

  • com.achang.netty.dubboRPC.netty.NettyServer

服务端初始化类

public class NettyServer 

    public static  void startServer(String hostname,Integer port) throws InterruptedException 
        startServer0(hostname,port);
    


    private static void startServer0(String hostname,Integer port) throws InterruptedException 
        NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try 
            ServerBootstrap bootstrap = new ServerBootstrap();
            ServerBootstrap serverBootstrap = bootstrap.group(boosGroup, workerGroup)
                //                    .handler(new LoggingHandler())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception 
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new NettyServerHandler());
                    
                );
            System.out.println("服务端启动成功....端口:"+port);
            ChannelFuture cf = serverBootstrap.bind(hostname, port).sync();
            cf.channel().closeFuture().sync();
        finally 
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    
    

  • com.achang.netty.dubboRPC.netty.NettyServerHandler

服务端处理器

public class NettyServerHandler extends ChannelInboundHandlerAdapter 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        //获取客户端发送来的消息,并调用服务
        System.out.println("msg="+msg);

        //客户端想要调用服务器的api时,想要满足一定协议的要求才能调用
        //比如,我们这里要求,每次发送消息时,都必须要求以"HelloService#hello开头"
        if (msg.toString().startsWith("HelloService#hello"))
            String result = new HelloServiceImpl().hello(msg.toString().split("HelloService#hello")[1]);
            ctx.writeAndFlush(result);
        
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    

  • com.achang.netty.dubboRPC.provider.HelloServiceImpl

客户端接口的真正实现Impl

public class HelloServiceImpl implements HelloService 
    private static int count = 0;
    @Override
    public String hello(String message) 
        System.out.println("客户端发来的消息为:【"+message+"】");
        if (message!=null)
            return "你好客户端,服务端已经收到了消息"+"调用次数为:【"+(++count)+"】";
        else 
            return "消息不能为空";
        
    

  • com.achang.netty.dubboRPC.publicinterface.HelloService

服务提供方 和 服务消费放 公共部分,约定的接口规范

public interface HelloService 
    String hello(String message);


4、测试

完结撒花!!!
源代码地址:
https://github.com/qq995931576/netty

以上是关于Day480.Netty手写dubboRPC框架 -netty的主要内容,如果未能解决你的问题,请参考以下文章

Golang手写RPC框架(day1)

Golang手写RPC框架(day1)

手写RPC Day3 服务注册

手写RPC Day3 服务注册

手写RPC Day2 高性能客户端

手写RPC Day2 高性能客户端