java netty tcp服务器线程阻塞问题定位解决
Posted 帝都码仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java netty tcp服务器线程阻塞问题定位解决相关的知识,希望对你有一定的参考价值。
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序.很多著名的rpc框架都是基于netty来做的,比如google的grpc,阿里的dubbo,新浪微博的motan等都是基于netty.总之如果你想自己写一个网络服务器,不用太考虑选型问题,直接用netty吧.
今天要说的是使用netty写服务器的时候碰到的线程阻塞的问题。最近一个业务上用netty来写了一个简单的tcp服务,整个服务的逻辑很简单。就是用netty起一个服务,然后几十个tcp client请求连接服务,保持长连接,然后服务端和客户端通过长连接来进行通讯,服务端根据客户端的请求处理业务并返回给客户端。拿到这个需求二话不说拿起netty就开撸了,代码也很简单如下(注意这里是示例代码,不是线上代码):
1.HelloServer,用netty的ServerBootstrap启动一个tcp监听服务,将业务处理交给HelloServerHandler来处理:
public class HelloServer {
// Logger logger = Logger.getLogger(SensorServer.class);
public HelloServer(){
}
public void bind(int port) throws Exception{
System.out.println("listening port:" + port);
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
/*
* ServerBootstrap 是一个启动NIO服务的辅助启动类
* 你可以在这个服务中直接使用Channel
*/
ServerBootstrap bootstrap = new ServerBootstrap();
/*
* 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常
*/
bootstrap.group(parentGroup, childGroup).channel(NioserverSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
// 注册handler
ch.pipeline().addLast(new HelloServerInHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
//绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind("127.0.0.1",port).sync();
System.out.println("listening port:" + port);
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
//logger.info(">>>>>>>>>>>TCP SERVER STARTED...port:" + port);
} catch (Exception e) {
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
/**
* @param args
*/
public static void main(String[] args) {
try {
try {
new HelloServer().bind(9001);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class HelloServerInHandler extends
ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
// msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
result.readBytes(result1);
String resultStr = new String(result1);
System.out.println("HelloServerInHandler.channelRead "+ resultStr);
//这里处理逻辑,用doSomeThing来代替
doSomeThing();
// 接收并打印客户端的信息
System.out.println("Client said:" + resultStr +" "+(new java.util.Date()).toLocaleString());
result.release();
// 向客户端发送消息
String response = "recv "+resultStr+" and I am ok!";
// 在当前场景下,发送的数据必须转换成ByteBuf数组
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
encoded.writeBytes(response.getBytes());
ctx.write(encoded);
ctx.flush();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
2.客户端也很简单,连接到服务器,然后把处理逻辑放到HelloClientIntHandler,代码如下:
public class HelloClient {
public void connect(String host, int port,final int i) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
HelloClientIntHandler handler = new HelloClientIntHandler();
handler.index = i;
ch.pipeline().addLast(handler);
}
});
// Start the client.
ChannelFuture f = b.connect(host, port);//.sync();
// Wait until the connection is closed.
//f.channel().closeFuture();//.sync();
} finally {
//workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
//模拟30个连接
for(int i=0;i<30;i++) {
System.out.println(i);
//Thread.sleep(1*1000);
HelloClient client = new HelloClient();
client.connect("127.0.0.1", 9001,i);
}
}
}
package com.seanlu;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {
public int index=0;
// 接收server端的消息,并打印出来
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//System.out.println("HelloClientIntHandler.channelRead");
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
System.out.println("Server said:" + new String(result1)+" "+(new java.util.Date()).toLocaleString());
result.release();
}
// 连接成功后,向server发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//System.out.println("HelloClientIntHandler.channelActive");
String msg = "client " + this.index + " Are you ok? ";
ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
encoded.writeBytes(msg.getBytes());
ctx.write(encoded);
ctx.flush();
//System.out.println("HelloClientIntHandler.channelActive");
}
}
好了,启动HelloServer和ClientServer,分别打出来HelloServer:
Client said:client 4 Are you ok? 2017-8-22 15:18:13
Client said:client 6 Are you ok? 2017-8-22 15:18:13
Client said:client 3 Are you ok? 2017-8-22 15:18:13
Client said:client 2 Are you ok? 2017-8-22 15:18:13
Client said:client 7 Are you ok? 2017-8-22 15:18:13
Client said:client 5 Are you ok? 2017-8-22 15:18:13
Client said:client 1 Are you ok? 2017-8-22 15:18:13
Client said:client 0 Are you ok? 2017-8-22 15:18:13
Client said:client 14 Are you ok? 2017-8-22 15:18:13
Client said:client 22 Are you ok? 2017-8-22 15:18:13
Client said:client 13 Are you ok? 2017-8-22 15:18:13
Client said:client 15 Are you ok? 2017-8-22 15:18:13
Client said:client 9 Are you ok? 2017-8-22 15:18:13
Client said:client 11 Are you ok? 2017-8-22 15:18:13
Client said:client 21 Are you ok? 2017-8-22 15:18:13
Client said:client 19 Are you ok? 2017-8-22 15:18:13
Client said:client 17 Are you ok? 2017-8-22 15:18:13
Client said:client 10 Are you ok? 2017-8-22 15:18:13
Client said:client 18 Are you ok? 2017-8-22 15:18:13
Client said:client 8 Are you ok? 2017-8-22 15:18:13
Client said:client 16 Are you ok? 2017-8-22 15:18:13
Client said:client 12 Are you ok? 2017-8-22 15:18:13
Client said:client 20 Are you ok? 2017-8-22 15:18:13
Client said:client 23 Are you ok? 2017-8-22 15:18:13
Client said:client 24 Are you ok? 2017-8-22 15:18:13
Client said:client 25 Are you ok? 2017-8-22 15:18:13
Client said:client 26 Are you ok? 2017-8-22 15:18:13
Client said:client 27 Are you ok? 2017-8-22 15:18:13
Client said:client 28 Are you ok? 2017-8-22 15:18:13
Client said:client 29 Are you ok? 2017-8-22 15:18:13
ClientServer:
Server said:recv client 4 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 19 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 15 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 3 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 9 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 10 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 1 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 2 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 0 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 18 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 5 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 7 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 11 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 6 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 13 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 14 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 8 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 16 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 21 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 17 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 22 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 12 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 20 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 24 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 23 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 25 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 26 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 27 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 28 Are you ok? and I am ok! 2017-8-22 15:18:13
Server said:recv client 29 Are you ok? and I am ok! 2017-8-22 15:18:13
可以看到服务端并发处理了客户端的请求并返回。
然而,我们的服务上线后确偶尔出现了服务器不处理请求的问题,具体表现在,端口还是通的,但是服务不出来请求了。日志也没有输出。
于是我们开始查找问题,发现有可能是处理线程被阻塞住了。那么我们先看看netty的线程模型是什么样的:
首先看看下面的代码:
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
/*
* ServerBootstrap 是一个启动NIO服务的辅助启动类
* 你可以在这个服务中直接使用Channel
*/
ServerBootstrap bootstrap = new ServerBootstrap();
/*
* 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常
*/
bootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
// 注册handler
ch.pipeline().addLast(new HelloServerInHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
我们需要知道的是,parentGroup是用来accept连接的,然后把accept的联接丢给 childGroup来处理,childGroup是真正执行HelloServerInHandler的线程池,那么netty究竟起了多少个线程来处理呢,我们看下EventLoopGroup的父类MultithreadEventLoopGroup的定义:
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, executor, args);
}
如果没有定义,那么默认是SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)
NettyRuntime.availableProcessors()是你的机器的核数,对于我们的机器是一个4核的机器,所以应该只起了8个线程来处理业务,如果业务慢把线程阻塞住了,那么后面的请求根本无法处理。到这里我们虽然很有把握了,但都还只是猜测。那么我们就模拟一下业务慢的时候来看看是不是这个问题。
首先我们把上面的HelloServerInHandler的dosomething改成Thread.sleep(10*1000),也就是业务处理需要10秒钟,然后看看输出:
Client said:client 1 Are you ok? 2017-8-22 16:03:38
Client said:client 4 Are you ok? 2017-8-22 16:03:38
Client said:client 3 Are you ok? 2017-8-22 16:03:38
Client said:client 5 Are you ok? 2017-8-22 16:03:38
Client said:client 2 Are you ok? 2017-8-22 16:03:38
Client said:client 0 Are you ok? 2017-8-22 16:03:38
Client said:client 6 Are you ok? 2017-8-22 16:03:38
Client said:client 7 Are you ok? 2017-8-22 16:03:38
Client said:client 15 Are you ok? 2017-8-22 16:03:48
Client said:client 12 Are you ok? 2017-8-22 16:03:48
Client said:client 9 Are you ok? 2017-8-22 16:03:48
Client said:client 10 Are you ok? 2017-8-22 16:03:48
Client said:client 11 Are you ok? 2017-8-22 16:03:48
Client said:client 13 Are you ok? 2017-8-22 16:03:48
Client said:client 8 Are you ok? 2017-8-22 16:03:48
Client said:client 14 Are you ok? 2017-8-22 16:03:48
Client said:client 23 Are you ok? 2017-8-22 16:03:58
Client said:client 17 Are you ok? 2017-8-22 16:03:58
Client said:client 20 Are you ok? 2017-8-22 16:03:58
Client said:client 16 Are you ok? 2017-8-22 16:03:58
Client said:client 19 Are you ok? 2017-8-22 16:03:58
Client said:client 21 Are you ok? 2017-8-22 16:03:58
Client said:client 18 Are you ok? 2017-8-22 16:03:58
Client said:client 22 Are you ok? 2017-8-22 16:03:58
Client said:client 25 Are you ok? 2017-8-22 16:04:08
Client said:client 28 Are you ok? 2017-8-22 16:04:08
Client said:client 29 Are you ok? 2017-8-22 16:04:08
Client said:client 26 Are you ok? 2017-8-22 16:04:08
Client said:client 27 Are you ok? 2017-8-22 16:04:08
Client said:client 24 Are you ok? 2017-8-22 16:04:08
发现服务器端是每隔10秒才输出响应的,说明如果有8个业务就会把处理线程用完,并且不处理新的业务请求了。这也证实了我们的猜想。
那怎么解决问题呢,首先提高业务处理时间是要从根本上解决的。但是大部分情况下我们业务处理并不慢,那么我们如何在偶尔有业务处理慢的时候不会阻塞住
新的请求呢,大家可能都想到了那就是加大线程,有2种办法,一种是加大netty的childGroup的线程个数,另外一种是把业务处理放到一个新的线程池处理。
这里我们采用第二种方法来解决:
首先写一个线程处理类来处理业务(我们起了30个线程):
public class ChannelReadWorker {
private static ExecutorService pool;
private static ChannelReadWorker inStance = new ChannelReadWorker();
private ChannelReadWorker(){
pool = Executors.newFixedThreadPool(30);
}
public static ChannelReadWorker getInstance(){
return inStance;
}
public static void doWork(final ChannelHandlerContext ctx, final Object msg){
pool.execute(new Runnable() {
@Override
public void run() {
try {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
// msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
result.readBytes(result1);
String resultStr = new String(result1);
System.out.println("HelloServerInHandler.channelRead "+ resultStr);
// 接收并打印客户端的信息
System.out.println("Client said:" + resultStr +" "+(new java.util.Date()).toLocaleString());
// 释放资源,这行很关键
result.release();
Thread.sleep(10*1000);
// 向客户端发送消息
String response = "recv "+resultStr+" and I am ok!";
// 在当前场景下,发送的数据必须转换成ByteBuf数组
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
encoded.writeBytes(response.getBytes());
ctx.write(encoded);
ctx.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
然后在HelloServerInHandler里面把处理工作丢到线程池处理
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
//尝试用线程池来解决问题,用30个线程
ChannelReadWorker worker = ChannelReadWorker.getInstance();
worker.doWork(ctx, msg);
return;
}
在运行程序,我们发现虽然业务处理还是10秒,但是服务器是同时响应的了:
Client said:client 2 Are you ok? 2017-8-22 16:15:34
Client said:client 29 Are you ok? 2017-8-22 16:15:34
Client said:client 25 Are you ok? 2017-8-22 16:15:34
Client said:client 7 Are you ok? 2017-8-22 16:15:34
Client said:client 8 Are you ok? 2017-8-22 16:15:34
Client said:client 1 Are you ok? 2017-8-22 16:15:34
Client said:client 0 Are you ok? 2017-8-22 16:15:34
Client said:client 18 Are you ok? 2017-8-22 16:15:34
Client said:client 15 Are you ok? 2017-8-22 16:15:34
Client said:client 11 Are you ok? 2017-8-22 16:15:34
Client said:client 3 Are you ok? 2017-8-22 16:15:34
Client said:client 9 Are you ok? 2017-8-22 16:15:34
Client said:client 19 Are you ok? 2017-8-22 16:15:34
Client said:client 12 Are you ok? 2017-8-22 16:15:34
Client said:client 21 Are you ok? 2017-8-22 16:15:34
Client said:client 5 Are you ok? 2017-8-22 16:15:34
Client said:client 27 Are you ok? 2017-8-22 16:15:34
Client said:client 24 Are you ok? 2017-8-22 16:15:34
Client said:client 10 Are you ok? 2017-8-22 16:15:34
Client said:client 6 Are you ok? 2017-8-22 16:15:34
Client said:client 28 Are you ok? 2017-8-22 16:15:34
Client said:client 26 Are you ok? 2017-8-22 16:15:34
Client said:client 4 Are you ok? 2017-8-22 16:15:34
Client said:client 17 Are you ok? 2017-8-22 16:15:34
Client said:client 20 Are you ok? 2017-8-22 16:15:34
Client said:client 22 Are you ok? 2017-8-22 16:15:34
Client said:client 23 Are you ok? 2017-8-22 16:15:34
Client said:client 14 Are you ok? 2017-8-22 16:15:34
Client said:client 16 Are you ok? 2017-8-22 16:15:34
Client said:client 13 Are you ok? 2017-8-22 16:15:34
到这里问题解决
以上是关于java netty tcp服务器线程阻塞问题定位解决的主要内容,如果未能解决你的问题,请参考以下文章