Netty框架之协议应用二(RPC开发实战之Dubbo)

Posted 木兮君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之协议应用二(RPC开发实战之Dubbo)相关的知识,希望对你有一定的参考价值。

前言

netty框架马上就进入尾声了,小编没有特别深入的讲解,第一是网络编程确实挺难的,第二用好netty其实是挺不容易的一件事情,尤其都是异步的情况下,今天小编继续为大家带来开发实战,上次分享了redis客户端和websocket弹幕功能的简单实现,这次为大家带来相对比较高档的rpc框架底层网络通信,今天主要以dubbo为例,希望大家有所收获。

RPC

定义

RPC为远程服务调用,即客户端远程调用服务端的方法,然后服务端返回响应或异常。常用的RPC解决方案有JAVA RMI,webService,Http Invoker,Dubbo,SpringCloud等等。

去中心化架构

传统集中式架构:
下图是小编理解的中心化架构

中心化架构其优点为:架构简单,客户端调用的时候可以跨语言,缺点的话所有的调用都会进过ngnix(这里就可以理解为中心,大家都得进过他,无论是调用还是返回响应),ngnix一旦挂了之后就会是服务挂了,当然如果ngnix部署集群也会让架构变的复杂。
去中心化架构
如下图:


这里的话客户端调用服务端不需要进过中心直连调用。
去中心化架构简单描述后,继续来看一下rpc框架组成。

框架组成


这个架构是不是似曾相识,这里如果看过小编的dubbo分享就觉得面熟了。
上面最小化实现rpc框架就是最下面的rpc协议就可以了,这样两个服务就可以通信即可。接着咱们来介绍一下rpc协议。

协议报文

这里小编直接用dubbo协议来说明报文:


上面请求头主要有16个字节,如果看过小编的前基本应用的使用后,看到这个就可以使用netty来进行消息的拆包以及编解码。那小编接下来继续说明编解码的过程。

概设过程

这边在写代码之前,小编先分享一下设计的思路,以及一些调用的逻辑。
首先是编解码:编解码占网络传输中必须且固定的,先看下图:


具体已经在上图解释清楚了,接下来看器调用过程即各个组件功能。


上图是比较简单的,接下来是至关重要的的从客户端到服务整个流程的调用过程


容小编解释一下:

  • 从客户端到服务端的调用涉及到了四个线程,分别是客户端以及服务端的业务线程和IO线程
  • 发起掉用写入消息体的内容是上面的Transfer,而编码request则为客户端发起的请求。而Transfer中的Request包含了接口,方法以及参数
  • 写入到socket中的为bytebuf,其经过Bytebuf -> head -> unsafe -> nio socket (doWrite) -> java nio channel -> socket
  • 写入到socket则到达服务端,服务端的io线程通过多路复用选择器select轮询,之后调用read
  • 读取到内容后,这边的读取流程和上面相似 unsafe read -> pipeline fireChannelRead,拿到了ByteBuf,然后解码request,根据上面的解码工具类
  • 从ByteBuf拿到Transfer,之后交给业务的handler,之后涉及到服务端业务线程的处理,业务处理后返回了结果或者报错信息(这里同上其实是Transfer)
  • 之后又交还给io线程,再次将Response进行编码(服务端的响应),Bytebuf写到socket
  • 回到客户端io线程后,再次有select进行轮询,读取到内容Bytebuf,解码成Transfer,Transfer中的response进行反序列化拿到结果填充回执,
  • 客户端拿到回执,释放等待。

注意事项
第一:加入客户端A和B的请求,客户端怎样拿到服务端回来的A响应和B相应呢,这里就需要Transfer里面的id,即协议中的id,不过如果是协议中的id,那就需要做请求的时候放入到一个map中来保存。
第二:既然知道使用id来区分请求响应,那什么时候放入到map中, 怎么保证线程安全,那最好是线程安全的map,不过高并发的时候,对系统很不友好,所以放入到map的时候也在io线程中执行。
第三:如何在io线程放入map中,这里是用eventloop的submit,写入消息完成后监听并写入map

讲完理论小编不是纯粹的理论派,还是代码实战派

代码实战

编解码工具以及传输类

Transfer类

public class Transfer 
   public static final byte STATUS_ERROR = 0;
   public static final byte STATUS_OK = 1;
   public static final byte STATUS_ILLEGAL = 2;
   public static final byte SERIALIZABLE_JAVA=1;
   public static final byte SERIALIZABLE_HESSIAN2=2;
   public static final byte SERIALIZABLE_JSON=3;

    boolean request;
    byte serializableId; // 1:java 2:hessian2 3:json
    boolean twoWay;
    boolean heartbeat;
    long id;
    byte status;    // 1正常 0失败 2请求非法
    Object target;

    public Transfer(long id) 
        this.id = id;
    
    

编解码工具类

public class RpcCodec extends ByteToMessageCodec 
    private static final int HEADER_LENGTH = 16;
    private static final short MAGIC = 0xdad;
    private static final ByteBuf MAGIC_BUF = Unpooled.copyShort(MAGIC);
    private static final byte FLAG_REQUEST = (byte) 0x80;//1000 0000
    private static final byte FLAG_TWO_WAY = (byte) 0x40; //0100 0000
    private static final byte FLAG_EVENT = (byte) 0x20;  //0010 0000
    private static final int SERIALIZATION_MASK = 0x1f;  //0001 1111

    // 编码
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) 
        if (msg instanceof Transfer) 
            doEncode((Transfer) msg, out);
         else 
            throw new IllegalArgumentException();
        
    

    //解码
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) 
        Transfer transfer = doDecode(in);
        if (transfer != null) 
            out.add(transfer);
        
    

    // 编码
    protected void doEncode(Transfer data, ByteBuf buf) 
        byte[] header = new byte[HEADER_LENGTH];
        Bytes.short2bytes(MAGIC, header);

        header[2] = data.serializableId;
        if (data.request) header[2] |= FLAG_REQUEST;
        if (data.twoWay) header[2] |= FLAG_TWO_WAY;
        if (data.heartbeat) header[2] |= FLAG_EVENT;
        if (!data.request) header[3] = data.status;

        Bytes.long2bytes(data.id, header, 4);// id 占8个字节
        int len = 0;
        byte[] body = new byte[0];
        if (!data.heartbeat) 
            body = serialize(data.serializableId, data.target);
            len = body.length;
        
        Bytes.int2bytes(len, header, 12);
        buf.writeBytes(header);
        buf.writeBytes(body);
    

    // 解码
    protected Transfer doDecode(ByteBuf in) 
        int index = ByteBufUtil.indexOf(MAGIC_BUF, in);
        //是否有魔数
        if (index < 0) 
            return null;
        
        //消息头是否完整
        if (!in.isReadable(index + HEADER_LENGTH)) 
            return null;
        
        byte[] header = new byte[HEADER_LENGTH];
//      in.getBytes(index, header);
        ByteBuf slice = in.slice();
        slice.readBytes(header);
        int length = Bytes.bytes2int(header, 12);
        //消息体是否完整
        if (!in.isReadable(index + HEADER_LENGTH + length)) 
            return null;//需要更多的字节
        
        Transfer transfer = new Transfer(Bytes.bytes2long(header, 4));
        transfer.heartbeat = (header[2] & FLAG_EVENT) != 0;
        transfer.request = (header[2] & FLAG_REQUEST) != 0;
        transfer.twoWay = (header[2] & FLAG_TWO_WAY) != 0;
        transfer.serializableId = (byte) (header[2] & SERIALIZATION_MASK);
        transfer.status = header[3];
        if (!transfer.heartbeat) 
            byte[] content = new byte[length];
//          in.getBytes(index + HEADER_LENGTH, bytes);
            slice.readBytes(content);
            transfer.target = deserialize(transfer.serializableId, content);
        
        //跳过已经读取的
        in.skipBytes(index + HEADER_LENGTH + length);

        return transfer;
    

    // 序列化
    private byte[] serialize(byte serializableId, Object target) 

        if (serializableId == Transfer.SERIALIZABLE_JAVA)  //JAVA
            ByteArrayOutputStream out;
            try 
                out = new ByteArrayOutputStream();
                ObjectOutputStream stream = new ObjectOutputStream(out);
                stream.writeObject(target);
             catch (IOException e) 
                throw new RuntimeException(e);
            
            return out.toByteArray();
         else 
            throw new UnsupportedOperationException();
        
    

    // 反序列化
    private Object deserialize(byte serializableId, byte[] bytes) 
        if (serializableId == Transfer.SERIALIZABLE_JAVA)  //JAVA
            try 
                ObjectInputStream stream =
                        new ObjectInputStream(new ByteArrayInputStream(bytes));
                return stream.readObject();
             catch (IOException | ClassNotFoundException e) 
                throw new RuntimeException(e);
            
         else 
            throw new UnsupportedOperationException();
        
    

客户端代码

public class RpcClient 
    static AtomicLong atomicLong = new AtomicLong(100);
    private Channel channel;
    private Map<Long, Promise<Response>> results = new HashMap<>();

    public static long getNextId() 
        return atomicLong.getAndIncrement();
    

    public void init(String address, int port) throws InterruptedException 
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup(1))
                .channel(NiosocketChannel.class);
        bootstrap.handler(new ChannelInitializer<Channel>() 
            @Override
            protected void initChannel(Channel ch) 
                ch.pipeline().addLast("codec", new RpcCodec());
                ch.pipeline().addLast("resultSet", new ResultFill());// 结果集填充
            
        );
        ChannelFuture connect = bootstrap.connect(address, port);
        channel = connect.sync().channel();
        System.out.println("连接成功");
        //
        // 每隔 两秒发送心跳
        channel.eventLoop().scheduleWithFixedDelay(() -> 
            Transfer transfer=new Transfer(getNextId());
            transfer.heartbeat=true;
            channel.writeAndFlush(transfer);
        ,2000,2000,TimeUnit.MILLISECONDS);
    

    public Response invokerRemote(Class serverInterface,
                                  String methodDesc,
                                  Object[] args) throws InterruptedException, ExecutionException, TimeoutException 
        Request request = new Request(serverInterface.getName(), methodDesc);
        request.setArgs(args);
        Transfer transfer = new Transfer(getNextId());
        transfer.request=true;
        transfer.serializableId=Transfer.SERIALIZABLE_JAVA;
        transfer.target = request;
        DefaultPromise<Response> resultPromise = new DefaultPromise(channel.eventLoop());
        // 写入成功后添加 结果
        channel.writeAndFlush(transfer).addListener(future ->
                // IO线程
                    if (future.cause() != null) // 写入失败
                        resultPromise.setFailure(future.cause()); //写入失败必须处理
                     else     // 写入成功
                        results.put(transfer.id, resultPromise);
                    
                
        );

        return resultPromise.get(10000, TimeUnit.MILLISECONDS);
    

    private class ResultFill extends SimpleChannelInboundHandler<Transfer> 
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Transfer msg) 
            if (msg.heartbeat) 
                System.out.println(String.format("服务端心跳返回:%s",
                        ctx.channel().remoteAddress()));
             else 
                Promise<Response> promise = results.remove(msg.id);
                promise.setSuccess((Response) msg.target); // 填充结果
            
        
    

    public <T> T getRemoteService(Class<T> serviceInterface) 
        assert serviceInterface.isInterface();
        Object o = Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]serviceInterface, new InvocationHandler() 
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Exception 
                if (Object.class.equals(method.getDeclaringClass())) 
                    return method.invoke(this, args);
                

                String methodDescriptor = method.getName()+Type.getMethodDescriptor(method);
                Response response = invokerRemote(serviceInterface, methodDescriptor, args);
                if (response.getError() != null) 
                    throw new RuntimeException("远程服务调用异常:", response.getError());
                
                return response.getResult();
            
        );
        return (T) o;
    



服务端代码:

public class RpcServer 
    ExecutorService threadPool = Executors.newFixedThreadPool(500);
    private Map<String, ServiceBean> register = new HashMap<>();

    public void start(int port) throws InterruptedException 以上是关于Netty框架之协议应用二(RPC开发实战之Dubbo)的主要内容,如果未能解决你的问题,请参考以下文章

Netty框架之协议应用二(RPC开发实战之Dubbo)

Netty(RPC高性能之道)原理剖析

RPC远程协议之Thrift入门

Netty实战一之异步和事件驱动

Netty实战二之自己的Netty应用程序

Netty框架之编解码机制二(自定义协议)