从微博motan看rpc基于netty4远程通讯设计2-客户端
Posted winter分享汇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从微博motan看rpc基于netty4远程通讯设计2-客户端相关的知识,希望对你有一定的参考价值。
从微博motan看rpc基于netty4远程通讯设计2-客户端
本系列文章-连载中
从微博motan看rpc基于netty4远程通讯设计2-客户端
从微博motan看rpc基于netty4远程通讯设计3-业务线程模型设计
从微博motan看rpc基于netty4远程通讯设计4-同步异步返回结果处理
从微博motan看rpc基于netty4远程通讯设计5-协议编解码与序列化
客户端设计概述
客户端类图
NettyClient 是客户端节点对外包装,所有对服务端交互的操作在NettyChannel
一个NettyClient包含多个NettyChannel,多个NettyChannel对应一个NettyServer
NettyClient open() 启动服务主要做了以下事情:
创建NettyDecoder解码器
创建NettyEncoder编码器
创建NettyChannelHandler,实现客户端MessageHandler进行服务端消息响应后续处理
初始化客户端连接池
客户端连接管理
AbstractSharedPoolClient的channels集合存储所有channel
初始化连接池:根据Channel管理工厂创建channel对象
protected void initPool() {
factory = createChannelFactory();
channels = new ArrayList<>(connections);
for (int i = 0; i < connections; i++) {
channels.add((Channel) factory.makeObject());
}
initConnections(url.getBooleanParameter(URLParamType.asyncInitConnection.getName(), URLParamType.asyncInitConnection.getBooleanValue()));
}
初始化与服务端连接,如果是异步则丢到线程池里面处理
protected void initConnections(boolean async) {
if (async) {
executor.execute(new Runnable() {
@Override
public void run() {
createConnections();
}
});
} else {
createConnections();
}
}
private void createConnections() {
for (Channel channel : channels) {
try {
channel.open();
} catch (Exception e) {
LoggerUtil.error("NettyClient init pool create connect Error: url=" + url.getUri(), e);
}
}
}
可以看到创建连接初始化最后还是调用的NettyChannel的open方法,里面调用Bootstrap连接服务端
@Override
public synchronized boolean open() {
if (isAvailable()) {
LoggerUtil.warn("the channel already open, local: " + localAddress + " remote: " + remoteAddress + " url: " + nettyClient.getUrl().getUri());
return true;
}
ChannelFuture channelFuture = null;
try {
long start = System.currentTimeMillis();
channelFuture = nettyClient.getBootstrap().connect(remoteAddress);
既然客户端连接是通过连接池管理的,我们发送消息的时候具体是用哪个channel呢?
答案在NettyClient的request方法里的getChannel()实现:
每次获取调用次数都会+1,调用次数跟channel个数取模,获取可用channel
protected Channel getChannel() throws MotanServiceException {
int index = MathUtil.getNonNegative(idx.getAndIncrement());
Channel channel;
for (int i = index; i < connections + index; i++) {
channel = channels.get(i % connections);
if (channel.isAvailable()) {
return channel;
} else {
factory.rebuildObject(channel);
}
}
String errorMsg = this.getClass().getSimpleName() + " getChannel Error: url=" + url.getUri();
LoggerUtil.error(errorMsg);
throw new MotanServiceException(errorMsg);
}
如果获取channel的时候,channel不可用怎么办?则该channel进行异步重连
@Override
public boolean rebuildObject(NettyChannel nettyChannel) {
ReentrantLock lock = nettyChannel.getLock();
if (lock.tryLock()) {
try {
if (!nettyChannel.isAvailable() && !nettyChannel.isReconnect()) {
nettyChannel.reconnect();
rebuildExecutorService.submit(new RebuildTask(nettyChannel));
}
} finally {
lock.unlock();
}
return true;
}
return false;
}
下一篇:从微博motan看rpc基于netty4远程通讯设计3-业务线程模型设计
以上是关于从微博motan看rpc基于netty4远程通讯设计2-客户端的主要内容,如果未能解决你的问题,请参考以下文章
微博开源的Motan RPC最新进展:新增跨语言及服务治理支持