Dubbo源码学习系列 整合Netty
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo源码学习系列 整合Netty相关的知识,希望对你有一定的参考价值。
前言
上篇文章写到了利用zookeeper的特性实现缓存服务地址列表,接下来我们可以借助Netty的优点对程序进行改造,使其即支持Http容器的tomcat,又支持Dubbo协议的Netty, 实际上Dubbo也是支持两种协议,一种是Dubbo协议,一种是Http, 其中dubbo协议就是借助Netty实现的。
Netty客户端与服务端交互流程
1) Netty客户端通过IP和端口绑定,准备好JSON数据包。
2) Netty client 将要发送的数据包通过一系列的encoder, 将数据加密,然后发送给socket,由socket 发送给服务端。
3) Netty server 接收到Netty client 发送过来的数据后,然后通过一系列的解码器,将数据进行解码,解码完毕后,可以自定义serverHandler处理响应逻辑,最后将响应的数据通过一系列的编码器编码,然后发送到socket中,最终发生给客户端。
4) Netty client 收到服务端发过来的消息后,又先会通过一系列的解码器将消息解码,然后将解码后的消息转交给clientHandler处理,处理完后,再将消息加密,通过socket返回给服务端。
1. 写一个NettyServer
在写NettyServer之前,我们需要先去学习Netty相关的网络基础知识,然后再去写Server。
NettyServer的启动流程如下:
1) 绑定ip,监听端口。
2) 创建一个Nio事件组,该事件组是IO多路复用的体现。
3) NioserverSocketChannel 注册selector事件处理器, 准备接收连接。
4) 初始化Channel信道, 数据是通过Channel来传输的。
5) 使用serverHandler 处理业务逻辑。
package com.luban.protocol.dubbo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyServer {
public void start(String hostName, int port) {
try {
final ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers
.weakCachingConcurrentResolver(this.getClass()
.getClassLoader())));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("handler", new NettyServerHandler());
}
});
bootstrap.bind(hostName, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
NettyServer server = new NettyServer();
server.start("127.0.0.1", 8001);
}
}
注: encoder和decoder的位置可以互换,但是都要写在handler前面,否则在调用的时候会出现有一端有TimeoutException
启动NettyServer:
启动成功后,接着写NettyClient!
2. 写一个NettyClient
1) NettyClient和NettyServer的连接方式是TCP, TCP连接的建立是比较耗时的一个操作,因此最好使用多线程技术的线程池去管理多连接,刚开始初始化的时候会要点时间,连接用完了会归还到线程池里,然后再次使用只需要从线程池去选择一个连接使用即可。
2) 我们可以选择使用CachedThreadPool, CachedThreadPool线程池是可以伸缩的,它会根据需要创建新的线程,然后再重用之前可使用的已创建的线程。
首先看一下一个关键的接口ChannelHandler,提供信道初始化的实现。
ChannelHandler
它有2个关键的实现类ChannelInboundHandlerAdapter和ChannelOutBoundHandlerAdapter,他们都可以向Pipeline大管道里读写数据, 区别是一个进入,一个是出去,ChannelInBoundHandlerAdapter在下文会用到。
ChannelInboundHandlerAdapter
方法一: channelActive(ChannelHandlerContext ctx); 在管道激活的时候初始化ChannelHandlerContext, 可用ChannelHandlerContext的wirteAndFlush()方法去向Pipeline写入数据, 实现ChannelInboundHandlerAdapter抽象类是必须重写此方法,要不然无法使用ctx去写数据。
方法二: chnnelRead(ChannelHandlerContext ctx, Object msg); 这个方法用来读取pipeline管道流过来的数据, msg为数据包。
ChannelOutBoundHandlerAdapter
主要方法:
方法一: bind(ChannelHandlerContext ctx, SocketAddress socketAddress, ChannelPromise channelPromise); 绑定IP, 监听端口。
另外他们都实现了Callable接口, 此接口里只有一个方法call() , 因此我们可以在线程执行时,将客户端发送消息的逻辑放到call()方法里。
package com.example.dubbo.protocol.dubbo;
import com.example.dubbo.framework.Invocation;
import com.example.dubbo.framework.URL;
import com.example.dubbo.protocol.http.HttpClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
public NettyClientHandler client = null;
private URL url;
// 使用线程池开启线程
private static ExecutorService executorService = Executors.newCachedThreadPool();
public NettyClientHandler start() {
client = new NettyClientHandler();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("handler", client);
}
});
// 连接socket
try {
bootstrap.connect(url.getHost(), url.getPort()).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
return client;
}
// 通过线程来发送信息给Server
public Object send(URL url, Invocation invocation) {
this.url = url;
if (client == null) {
client = start();
}
client.setInvocation(invocation);
try {
Object obj = executorService.submit(client).get();
return obj;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
//测试nettyClient往nettyServer发送message
Invocation invocation = new Invocation("com.example.dubbo.provider.api.UserInterface", "sayHello", new Object[]{"bingbing"}, new Class[]{String.class});
NettyClient client = new NettyClient();
URL url = new URL("localhost", 8001);
client.send(url, invocation);
}
}
3. 用NettyClient测试NettyServer
public static void main(String[] args) {
//测试nettyClient往nettyServer发送message
Invocation invocation = new Invocation("com.example.dubbo.provider.api.UserInterface", "sayHello", new Object[]{"bingbing"}, new Class[]{String.class});
NettyClient client = new NettyClient();
URL url = new URL("localhost", 8001);
client.send(url, invocation);
}
消费方:
服务方:
注: 坑来了!
1. 根据如上consumer控制台打印结果发现我在服务端已经将执行结果回写到了pipeline里,但消费方读取到的结果为null! 是什么原因呢?
经过一番分析后,原因是result还没来得及被赋值就给返回出去了。发现是因为在执行call()方法时,当前线程没有等待result赋值后就返回了, 所以返回的result为null。
解决方法
在发送消息后,用wait()阻塞一下,等待ChannelRead()读取到服务端发送过来的数据赋值给Result, 然后再返回result。
@Override
public Object call() throws Exception {
// 在Call 方法执行逻辑
System.out.println("向服务端发送消息...");
ctx.writeAndFlush(invocation);
wait();
return result;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//读取消息
this.result = msg;
notify();
}
2. 重新启动consumer, consumer控制台报错:
查阅官网相关资料后,发现出现此错误的原因主要包含了以下几种情况:
当前线程不含有对象锁资源的时候去调用了wait()方法。
当前线程不含有对象锁资源的时候去调用了notify()方法。
当前线程不含有对象锁资源的时候去调用了notifyAll()方法。
2. 在高并发情况下,如果有2个线程同时进入到call()方法里,会不会出现上述的报错问题?
答案是肯定的,有可能会存在如下情况,上一个线程拿到对象资源锁执行wait(),但它还没执行完,下一个线程就又进入到call()方法去拿wait(),没有拿到资源锁就wait(), 所以报错。
解决方法
加锁,在call()方法和channelRead()方法上添加synchronized关键字。
重写启动后,观察结果!
程序调通了后,就可以把netty客户端和服务端整合到之前的项目里。
4. 整合netty
consumer:
provider:
启动nettyServer
重新启动观察结果:
借助Netty实现远程调用成功,至此,整合Netty完毕!
5. failed to create a child event loop 报错问题
稳定运行一段时间后,发现开始出现问题了,问题如下:
Exception in thread "main" java.lang.IllegalStateException: failed to create a child event loop
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:68)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:49)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:61)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:52)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:44)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:36)
at com.example.dubbo.protocol.dubbo.NettyClient.start(NettyClient.java:36)
at com.example.dubbo.protocol.dubbo.NettyClient.send(NettyClient.java:62)
at com.example.dubbo.framework.ProxyFactory$1.invoke(ProxyFactory.java:40)
at org.springframework.cglib.proxy.Proxy$ProxyImpl$$EnhancerByCGLIB$$2c24aae9.sayHello(<generated>)
at com.example.dubbo.consumer.ConsumerApplication.main(ConsumerApplication.java:25)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:128)
at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:120)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:87)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:64)
... 10 more
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:94)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.<init>(PipeImpl.java:171)
at sun.nio.ch.SelectorProviderImpl.openPipe(SelectorProviderImpl.java:50)
at java.nio.channels.Pipe.open(Pipe.java:155)
at sun.nio.ch.WindowsSelectorImpl.<init>(WindowsSelectorImpl.java:127)
at sun.nio.ch.WindowsSelectorProvider.openSelector(WindowsSelectorProvider.java:44)
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:126)
... 13 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at java.nio.channels.SocketChannel.open(SocketChannel.java:189)
at sun.nio.ch.PipeImpl$Initializer$LoopbackConnector.run(PipeImpl.java:127)
at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:76)
... 21 more
发现最终的问题是由于 no buffer space available (maximum connections reached?): connect
用netstat -ano 命令查看连接情况, 发现有大量的9090端口的连接没有被释放。
发现是生成了大量的TCP连接。
使用jvisualvm查看内存使用情况, 发现内存几乎用满。
在资源有限的情况下CashedThreadPool 有可能会创建大量的线程来不及回收,默认回收时间是60s, 最终导致OOM。
可以通过Semaphore 进行加锁,相当于限流的作用。
源代码: https://gitee.com/bingbing-123456/dubbo-rpc.git
以上是关于Dubbo源码学习系列 整合Netty的主要内容,如果未能解决你的问题,请参考以下文章
Dubbo源码学习系列 整合zookeeper注册中心并提供watcher机制