Netty框架之协议应用二(RPC开发实战之Dubbo)
Posted 木兮君
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之协议应用二(RPC开发实战之Dubbo)相关的知识,希望对你有一定的参考价值。
前言
netty框架马上就进入尾声了,小编没有特别深入的讲解,第一是网络编程确实挺难的,第二用好netty其实是挺不容易的一件事情,尤其都是异步的情况下,今天小编继续为大家带来开发实战,上次分享了redis客户端和websocket弹幕功能的简单实现,这次为大家带来相对比较高档的rpc框架底层网络通信,今天主要以dubbo为例,希望大家有所收获。
RPC
定义
RPC为远程服务调用,即客户端远程调用服务端的方法,然后服务端返回响应或异常。常用的RPC解决方案有JAVA RMI,webService,Http Invoker,Dubbo,SpringCloud等等。
去中心化架构
传统集中式架构:
下图是小编理解的中心化架构
中心化架构其优点为:架构简单,客户端调用的时候可以跨语言,缺点的话所有的调用都会进过ngnix(这里就可以理解为中心,大家都得进过他,无论是调用还是返回响应),ngnix一旦挂了之后就会是服务挂了,当然如果ngnix部署集群也会让架构变的复杂。
去中心化架构
如下图:
这里的话客户端调用服务端不需要进过中心直连调用。
去中心化架构简单描述后,继续来看一下rpc框架组成。
框架组成
这个架构是不是似曾相识,这里如果看过小编的dubbo分享就觉得面熟了。
上面最小化实现rpc框架就是最下面的rpc协议就可以了,这样两个服务就可以通信即可。接着咱们来介绍一下rpc协议。
协议报文
这里小编直接用dubbo协议来说明报文:
上面请求头主要有16个字节,如果看过小编的前基本应用的使用后,看到这个就可以使用netty来进行消息的拆包以及编解码。那小编接下来继续说明编解码的过程。
概设过程
这边在写代码之前,小编先分享一下设计的思路,以及一些调用的逻辑。
首先是编解码:编解码占网络传输中必须且固定的,先看下图:
具体已经在上图解释清楚了,接下来看器调用过程即各个组件功能。
上图是比较简单的,接下来是至关重要的的从客户端到服务整个流程的调用过程
容小编解释一下:
- 从客户端到服务端的调用涉及到了四个线程,分别是客户端以及服务端的业务线程和IO线程
- 发起掉用写入消息体的内容是上面的Transfer,而编码request则为客户端发起的请求。而Transfer中的Request包含了接口,方法以及参数
- 写入到socket中的为bytebuf,其经过Bytebuf -> head -> unsafe -> nio socket (doWrite) -> java nio channel -> socket
- 写入到socket则到达服务端,服务端的io线程通过多路复用选择器select轮询,之后调用read
- 读取到内容后,这边的读取流程和上面相似 unsafe read -> pipeline fireChannelRead,拿到了ByteBuf,然后解码request,根据上面的解码工具类
- 从ByteBuf拿到Transfer,之后交给业务的handler,之后涉及到服务端业务线程的处理,业务处理后返回了结果或者报错信息(这里同上其实是Transfer)
- 之后又交还给io线程,再次将Response进行编码(服务端的响应),Bytebuf写到socket
- 回到客户端io线程后,再次有select进行轮询,读取到内容Bytebuf,解码成Transfer,Transfer中的response进行反序列化拿到结果填充回执,
- 客户端拿到回执,释放等待。
注意事项
第一:加入客户端A和B的请求,客户端怎样拿到服务端回来的A响应和B相应呢,这里就需要Transfer里面的id,即协议中的id,不过如果是协议中的id,那就需要做请求的时候放入到一个map中来保存。
第二:既然知道使用id来区分请求响应,那什么时候放入到map中, 怎么保证线程安全,那最好是线程安全的map,不过高并发的时候,对系统很不友好,所以放入到map的时候也在io线程中执行。
第三:如何在io线程放入map中,这里是用eventloop的submit,写入消息完成后监听并写入map
讲完理论小编不是纯粹的理论派,还是代码实战派
代码实战
编解码工具以及传输类
Transfer类
public class Transfer
public static final byte STATUS_ERROR = 0;
public static final byte STATUS_OK = 1;
public static final byte STATUS_ILLEGAL = 2;
public static final byte SERIALIZABLE_JAVA=1;
public static final byte SERIALIZABLE_HESSIAN2=2;
public static final byte SERIALIZABLE_JSON=3;
boolean request;
byte serializableId; // 1:java 2:hessian2 3:json
boolean twoWay;
boolean heartbeat;
long id;
byte status; // 1正常 0失败 2请求非法
Object target;
public Transfer(long id)
this.id = id;
编解码工具类
public class RpcCodec extends ByteToMessageCodec
private static final int HEADER_LENGTH = 16;
private static final short MAGIC = 0xdad;
private static final ByteBuf MAGIC_BUF = Unpooled.copyShort(MAGIC);
private static final byte FLAG_REQUEST = (byte) 0x80;//1000 0000
private static final byte FLAG_TWO_WAY = (byte) 0x40; //0100 0000
private static final byte FLAG_EVENT = (byte) 0x20; //0010 0000
private static final int SERIALIZATION_MASK = 0x1f; //0001 1111
// 编码
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
if (msg instanceof Transfer)
doEncode((Transfer) msg, out);
else
throw new IllegalArgumentException();
//解码
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out)
Transfer transfer = doDecode(in);
if (transfer != null)
out.add(transfer);
// 编码
protected void doEncode(Transfer data, ByteBuf buf)
byte[] header = new byte[HEADER_LENGTH];
Bytes.short2bytes(MAGIC, header);
header[2] = data.serializableId;
if (data.request) header[2] |= FLAG_REQUEST;
if (data.twoWay) header[2] |= FLAG_TWO_WAY;
if (data.heartbeat) header[2] |= FLAG_EVENT;
if (!data.request) header[3] = data.status;
Bytes.long2bytes(data.id, header, 4);// id 占8个字节
int len = 0;
byte[] body = new byte[0];
if (!data.heartbeat)
body = serialize(data.serializableId, data.target);
len = body.length;
Bytes.int2bytes(len, header, 12);
buf.writeBytes(header);
buf.writeBytes(body);
// 解码
protected Transfer doDecode(ByteBuf in)
int index = ByteBufUtil.indexOf(MAGIC_BUF, in);
//是否有魔数
if (index < 0)
return null;
//消息头是否完整
if (!in.isReadable(index + HEADER_LENGTH))
return null;
byte[] header = new byte[HEADER_LENGTH];
// in.getBytes(index, header);
ByteBuf slice = in.slice();
slice.readBytes(header);
int length = Bytes.bytes2int(header, 12);
//消息体是否完整
if (!in.isReadable(index + HEADER_LENGTH + length))
return null;//需要更多的字节
Transfer transfer = new Transfer(Bytes.bytes2long(header, 4));
transfer.heartbeat = (header[2] & FLAG_EVENT) != 0;
transfer.request = (header[2] & FLAG_REQUEST) != 0;
transfer.twoWay = (header[2] & FLAG_TWO_WAY) != 0;
transfer.serializableId = (byte) (header[2] & SERIALIZATION_MASK);
transfer.status = header[3];
if (!transfer.heartbeat)
byte[] content = new byte[length];
// in.getBytes(index + HEADER_LENGTH, bytes);
slice.readBytes(content);
transfer.target = deserialize(transfer.serializableId, content);
//跳过已经读取的
in.skipBytes(index + HEADER_LENGTH + length);
return transfer;
// 序列化
private byte[] serialize(byte serializableId, Object target)
if (serializableId == Transfer.SERIALIZABLE_JAVA) //JAVA
ByteArrayOutputStream out;
try
out = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(out);
stream.writeObject(target);
catch (IOException e)
throw new RuntimeException(e);
return out.toByteArray();
else
throw new UnsupportedOperationException();
// 反序列化
private Object deserialize(byte serializableId, byte[] bytes)
if (serializableId == Transfer.SERIALIZABLE_JAVA) //JAVA
try
ObjectInputStream stream =
new ObjectInputStream(new ByteArrayInputStream(bytes));
return stream.readObject();
catch (IOException | ClassNotFoundException e)
throw new RuntimeException(e);
else
throw new UnsupportedOperationException();
客户端代码
public class RpcClient
static AtomicLong atomicLong = new AtomicLong(100);
private Channel channel;
private Map<Long, Promise<Response>> results = new HashMap<>();
public static long getNextId()
return atomicLong.getAndIncrement();
public void init(String address, int port) throws InterruptedException
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(1))
.channel(NiosocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>()
@Override
protected void initChannel(Channel ch)
ch.pipeline().addLast("codec", new RpcCodec());
ch.pipeline().addLast("resultSet", new ResultFill());// 结果集填充
);
ChannelFuture connect = bootstrap.connect(address, port);
channel = connect.sync().channel();
System.out.println("连接成功");
//
// 每隔 两秒发送心跳
channel.eventLoop().scheduleWithFixedDelay(() ->
Transfer transfer=new Transfer(getNextId());
transfer.heartbeat=true;
channel.writeAndFlush(transfer);
,2000,2000,TimeUnit.MILLISECONDS);
public Response invokerRemote(Class serverInterface,
String methodDesc,
Object[] args) throws InterruptedException, ExecutionException, TimeoutException
Request request = new Request(serverInterface.getName(), methodDesc);
request.setArgs(args);
Transfer transfer = new Transfer(getNextId());
transfer.request=true;
transfer.serializableId=Transfer.SERIALIZABLE_JAVA;
transfer.target = request;
DefaultPromise<Response> resultPromise = new DefaultPromise(channel.eventLoop());
// 写入成功后添加 结果
channel.writeAndFlush(transfer).addListener(future ->
// IO线程
if (future.cause() != null) // 写入失败
resultPromise.setFailure(future.cause()); //写入失败必须处理
else // 写入成功
results.put(transfer.id, resultPromise);
);
return resultPromise.get(10000, TimeUnit.MILLISECONDS);
private class ResultFill extends SimpleChannelInboundHandler<Transfer>
@Override
protected void channelRead0(ChannelHandlerContext ctx, Transfer msg)
if (msg.heartbeat)
System.out.println(String.format("服务端心跳返回:%s",
ctx.channel().remoteAddress()));
else
Promise<Response> promise = results.remove(msg.id);
promise.setSuccess((Response) msg.target); // 填充结果
public <T> T getRemoteService(Class<T> serviceInterface)
assert serviceInterface.isInterface();
Object o = Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]serviceInterface, new InvocationHandler()
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Exception
if (Object.class.equals(method.getDeclaringClass()))
return method.invoke(this, args);
String methodDescriptor = method.getName()+Type.getMethodDescriptor(method);
Response response = invokerRemote(serviceInterface, methodDescriptor, args);
if (response.getError() != null)
throw new RuntimeException("远程服务调用异常:", response.getError());
return response.getResult();
);
return (T) o;
服务端代码:
public class RpcServer
ExecutorService threadPool = Executors.newFixedThreadPool(500);
private Map<String, ServiceBean> register = new HashMap<>();
public void start(int port) throws InterruptedException 以上是关于Netty框架之协议应用二(RPC开发实战之Dubbo)的主要内容,如果未能解决你的问题,请参考以下文章