java netty tcp服务器线程阻塞问题定位解决

Posted 帝都码仔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java netty tcp服务器线程阻塞问题定位解决相关的知识,希望对你有一定的参考价值。

        Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序.很多著名的rpc框架都是基于netty来做的,比如googlegrpc,阿里的dubbo,新浪微博的motan等都是基于netty.总之如果你想自己写一个网络服务器,不用太考虑选型问题,直接用netty.

        今天要说的是使用netty写服务器的时候碰到的线程阻塞的问题。最近一个业务上用netty来写了一个简单的tcp服务,整个服务的逻辑很简单。就是用netty起一个服务,然后几十个tcp client请求连接服务,保持长连接,然后服务端和客户端通过长连接来进行通讯,服务端根据客户端的请求处理业务并返回给客户端。拿到这个需求二话不说拿起netty就开撸了,代码也很简单如下(注意这里是示例代码,不是线上代码):

1.HelloServer,用nettyServerBootstrap启动一个tcp监听服务,将业务处理交给HelloServerHandler来处理:

public class HelloServer {

   // Logger logger = Logger.getLogger(SensorServer.class);


    public HelloServer(){


    }


    public void bind(int port) throws Exception{

        System.out.println("listening port:" + port);


        EventLoopGroup parentGroup = new NioEventLoopGroup();

        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {

         /*

          * ServerBootstrap 是一个启动NIO服务的辅助启动类

          * 你可以在这个服务中直接使用Channel

          */

            ServerBootstrap bootstrap = new ServerBootstrap();

         /*

          * 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常

          */

            bootstrap.group(parentGroup, childGroup).channel(NioserverSocketChannel.class)

                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override

                        public void initChannel(SocketChannel ch)

                                throws Exception {

                            // 注册handler

                            ch.pipeline().addLast(new HelloServerInHandler());

                        }

                    }).option(ChannelOption.SO_BACKLOG, 128)

                    .childOption(ChannelOption.SO_KEEPALIVE, true);


            //绑定端口,同步等待成功

            ChannelFuture f = bootstrap.bind("127.0.0.1",port).sync();

            System.out.println("listening port:" + port);

            //等待服务端监听端口关闭

            f.channel().closeFuture().sync();


            //logger.info(">>>>>>>>>>>TCP SERVER STARTED...port:" + port);

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            parentGroup.shutdownGracefully();

            childGroup.shutdownGracefully();

        }

    }



    /**

     * @param args

     */

    public static void main(String[] args) {

        try {


                    try {

                        new HelloServer().bind(9001);

                    } catch (Exception e) {

                        e.printStackTrace();

                    }


        } catch (Exception e) {

            e.printStackTrace();

        }

    }


}


public   class HelloServerInHandler extends

        ChannelInboundHandlerAdapter {



    public void channelRead(ChannelHandlerContext ctx, Object msg)

            throws Exception {


        ByteBuf result = (ByteBuf) msg;

        byte[] result1 = new byte[result.readableBytes()];

        // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中

        result.readBytes(result1);

        String resultStr = new String(result1);

        System.out.println("HelloServerInHandler.channelRead "+ resultStr);

        //这里处理逻辑,用doSomeThing来代替

        doSomeThing();

        // 接收并打印客户端的信息

        System.out.println("Client said:" + resultStr +" "+(new java.util.Date()).toLocaleString());

        result.release();

        // 向客户端发送消息

        String response = "recv "+resultStr+" and I am ok!";

        // 在当前场景下,发送的数据必须转换成ByteBuf数组

        ByteBuf encoded = ctx.alloc().buffer(4 * response.length());

        

        

        encoded.writeBytes(response.getBytes());

        ctx.write(encoded);

        ctx.flush();

    }

    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        ctx.flush();

    }

}

2.客户端也很简单,连接到服务器,然后把处理逻辑放到HelloClientIntHandler,代码如下:

public class HelloClient {
    public void connect(String host, int port,final int i) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    HelloClientIntHandler handler = new HelloClientIntHandler();
                    handler.index = i;
                    ch.pipeline().addLast(handler);
                }
            });

            // Start the client.
           
ChannelFuture f = b.connect(host, port);//.sync();

            // Wait until the connection is closed.
            //f.channel
().closeFuture();//.sync();
       
} finally {
            //workerGroup.shutdownGracefully();
       
}

    }

    public static void main(String[] args) throws Exception {
//模拟30个连接
       
for(int i=0;i<30;i++) {
            System.out.println(i);
            //Thread.sleep(1*1000);
           
HelloClient client = new HelloClient();
            client.connect("127.0.0.1", 9001,i);
       }

    }
}


package com.seanlu;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {

    public int index=0;
    // 接收server端的消息,并打印出来
   
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //System.out.println("HelloClientIntHandler.channelRead");
       
ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        result.readBytes(result1);
System.out.println("Server said:" + new String(result1)+" "+(new java.util.Date()).toLocaleString());
        result.release();
    }

    // 连接成功后,向server发送消息
   
@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //System.out.println("HelloClientIntHandler.channelActive");


           
String msg = "client " + this.index + " Are you ok? ";
            ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
            encoded.writeBytes(msg.getBytes());
            ctx.write(encoded);
            ctx.flush();


        //System.out.println("HelloClientIntHandler.channelActive");
   
}
}


好了,启动HelloServerClientServer,分别打出来HelloServer:


Client said:client 4 Are you ok2017-8-22 15:18:13

Client said:client 6 Are you ok2017-8-22 15:18:13

Client said:client 3 Are you ok2017-8-22 15:18:13

Client said:client 2 Are you ok2017-8-22 15:18:13

Client said:client 7 Are you ok2017-8-22 15:18:13

Client said:client 5 Are you ok2017-8-22 15:18:13

Client said:client 1 Are you ok2017-8-22 15:18:13

Client said:client 0 Are you ok2017-8-22 15:18:13

Client said:client 14 Are you ok2017-8-22 15:18:13

Client said:client 22 Are you ok2017-8-22 15:18:13

Client said:client 13 Are you ok2017-8-22 15:18:13

Client said:client 15 Are you ok2017-8-22 15:18:13

Client said:client 9 Are you ok2017-8-22 15:18:13

Client said:client 11 Are you ok2017-8-22 15:18:13

Client said:client 21 Are you ok2017-8-22 15:18:13

Client said:client 19 Are you ok2017-8-22 15:18:13

Client said:client 17 Are you ok2017-8-22 15:18:13

Client said:client 10 Are you ok2017-8-22 15:18:13

Client said:client 18 Are you ok2017-8-22 15:18:13

Client said:client 8 Are you ok2017-8-22 15:18:13

Client said:client 16 Are you ok2017-8-22 15:18:13

Client said:client 12 Are you ok2017-8-22 15:18:13

Client said:client 20 Are you ok2017-8-22 15:18:13

Client said:client 23 Are you ok2017-8-22 15:18:13

Client said:client 24 Are you ok2017-8-22 15:18:13

Client said:client 25 Are you ok2017-8-22 15:18:13

Client said:client 26 Are you ok2017-8-22 15:18:13

Client said:client 27 Are you ok2017-8-22 15:18:13

Client said:client 28 Are you ok2017-8-22 15:18:13

Client said:client 29 Are you ok2017-8-22 15:18:13


ClientServer:

Server said:recv client 4 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 19 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 15 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 3 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 9 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 10 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 1 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 2 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 0 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 18 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 5 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 7 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 11 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 6 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 13 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 14 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 8 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 16 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 21 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 17 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 22 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 12 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 20 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 24 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 23 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 25 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 26 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 27 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 28 Are you okand I am ok! 2017-8-22 15:18:13

Server said:recv client 29 Are you okand I am ok! 2017-8-22 15:18:13


可以看到服务端并发处理了客户端的请求并返回。


然而,我们的服务上线后确偶尔出现了服务器不处理请求的问题,具体表现在,端口还是通的,但是服务不出来请求了。日志也没有输出。

于是我们开始查找问题,发现有可能是处理线程被阻塞住了。那么我们先看看netty的线程模型是什么样的


首先看看下面的代码:

     EventLoopGroup parentGroup = new NioEventLoopGroup();
     EventLoopGroup childGroup = new NioEventLoopGroup();
     try {
/*
 * ServerBootstrap
是一个启动NIO服务的辅助启动类
 *
你可以在这个服务中直接使用Channel
 */
        
ServerBootstrap bootstrap = new ServerBootstrap();
/*
 *
这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常
 */
        
bootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch)
                             throws Exception {
                         // 注册handler
                        
ch.pipeline().addLast(new HelloServerInHandler());
                     }
                 }).option(ChannelOption.SO_BACKLOG, 128)
                 .childOption(ChannelOption.SO_KEEPALIVE, true);


我们需要知道的是,parentGroup是用来accept连接的,然后把accept的联接丢给 childGroup来处理,childGroup是真正执行HelloServerInHandler的线程池,那么netty究竟起了多少个线程来处理呢,我们看下EventLoopGroup的父类MultithreadEventLoopGroup的定义:

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, executor, args);
}


如果没有定义,那么默认是SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)

NettyRuntime.availableProcessors()是你的机器的核数,对于我们的机器是一个4核的机器,所以应该只起了8个线程来处理业务,如果业务慢把线程阻塞住了,那么后面的请求根本无法处理。到这里我们虽然很有把握了,但都还只是猜测。那么我们就模拟一下业务慢的时候来看看是不是这个问题。

首先我们把上面的HelloServerInHandlerdosomething改成Thread.sleep(10*1000),也就是业务处理需要10秒钟,然后看看输出:

Client said:client 1 Are you ok?  2017-8-22 16:03:38

Client said:client 4 Are you ok?  2017-8-22 16:03:38

Client said:client 3 Are you ok?  2017-8-22 16:03:38

Client said:client 5 Are you ok?  2017-8-22 16:03:38

Client said:client 2 Are you ok?  2017-8-22 16:03:38

Client said:client 0 Are you ok?  2017-8-22 16:03:38

Client said:client 6 Are you ok?  2017-8-22 16:03:38

Client said:client 7 Are you ok?  2017-8-22 16:03:38

Client said:client 15 Are you ok?  2017-8-22 16:03:48

Client said:client 12 Are you ok?  2017-8-22 16:03:48

Client said:client 9 Are you ok?  2017-8-22 16:03:48

Client said:client 10 Are you ok?  2017-8-22 16:03:48

Client said:client 11 Are you ok?  2017-8-22 16:03:48

Client said:client 13 Are you ok?  2017-8-22 16:03:48

Client said:client 8 Are you ok?  2017-8-22 16:03:48

Client said:client 14 Are you ok?  2017-8-22 16:03:48

Client said:client 23 Are you ok?  2017-8-22 16:03:58

Client said:client 17 Are you ok?  2017-8-22 16:03:58

Client said:client 20 Are you ok?  2017-8-22 16:03:58

Client said:client 16 Are you ok?  2017-8-22 16:03:58

Client said:client 19 Are you ok?  2017-8-22 16:03:58

Client said:client 21 Are you ok?  2017-8-22 16:03:58

Client said:client 18 Are you ok?  2017-8-22 16:03:58

Client said:client 22 Are you ok?  2017-8-22 16:03:58

Client said:client 25 Are you ok?  2017-8-22 16:04:08

Client said:client 28 Are you ok?  2017-8-22 16:04:08

Client said:client 29 Are you ok?  2017-8-22 16:04:08

Client said:client 26 Are you ok?  2017-8-22 16:04:08

Client said:client 27 Are you ok?  2017-8-22 16:04:08

Client said:client 24 Are you ok?  2017-8-22 16:04:08


发现服务器端是每隔10秒才输出响应的,说明如果有8个业务就会把处理线程用完,并且不处理新的业务请求了。这也证实了我们的猜想。

那怎么解决问题呢,首先提高业务处理时间是要从根本上解决的。但是大部分情况下我们业务处理并不慢,那么我们如何在偶尔有业务处理慢的时候不会阻塞住

新的请求呢,大家可能都想到了那就是加大线程,有2种办法,一种是加大netty的childGroup的线程个数,另外一种是把业务处理放到一个新的线程池处理。

这里我们采用第二种方法来解决:

首先写一个线程处理类来处理业务(我们起了30个线程):

public class ChannelReadWorker {
    private static ExecutorService pool;
    private static ChannelReadWorker inStance = new ChannelReadWorker();
    private ChannelReadWorker(){
        pool = Executors.newFixedThreadPool(30);
    }
    public  static ChannelReadWorker getInstance(){
        return inStance;
    }
    public static void doWork(final ChannelHandlerContext ctx, final  Object msg){
        pool.execute(new Runnable() {

@Override
public void run() {
try {
                        ByteBuf result = (ByteBuf) msg;
                        byte[] result1 = new byte[result.readableBytes()];
                        // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
                       
result.readBytes(result1);
                        String resultStr = new String(result1);
                        System.out.println("HelloServerInHandler.channelRead "+ resultStr);

                        // 接收并打印客户端的信息
System.out.println("Client said:" + resultStr +" "+(new java.util.Date()).toLocaleString());


                        // 释放资源,这行很关键
                       
result.release();
                        Thread.sleep(10*1000);
                        // 向客户端发送消息
                       
String response = "recv "+resultStr+" and I am ok!";
                        // 在当前场景下,发送的数据必须转换成ByteBuf数组
                       
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
                        encoded.writeBytes(response.getBytes());
                        ctx.write(encoded);
                        ctx.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
});

    }
}


然后在HelloServerInHandler里面把处理工作丢到线程池处理


public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    //尝试用线程池来解决问题,用30个线程
        ChannelReadWorker worker = ChannelReadWorker.getInstance();
        worker.doWork(ctx, msg);
        return;

}


在运行程序,我们发现虽然业务处理还是10秒,但是服务器是同时响应的了:


Client said:client 2 Are you ok2017-8-22 16:15:34

Client said:client 29 Are you ok2017-8-22 16:15:34

Client said:client 25 Are you ok2017-8-22 16:15:34

Client said:client 7 Are you ok2017-8-22 16:15:34

Client said:client 8 Are you ok2017-8-22 16:15:34

Client said:client 1 Are you ok2017-8-22 16:15:34

Client said:client 0 Are you ok2017-8-22 16:15:34

Client said:client 18 Are you ok2017-8-22 16:15:34

Client said:client 15 Are you ok2017-8-22 16:15:34

Client said:client 11 Are you ok2017-8-22 16:15:34

Client said:client 3 Are you ok2017-8-22 16:15:34

Client said:client 9 Are you ok2017-8-22 16:15:34

Client said:client 19 Are you ok2017-8-22 16:15:34

Client said:client 12 Are you ok2017-8-22 16:15:34

Client said:client 21 Are you ok2017-8-22 16:15:34

Client said:client 5 Are you ok2017-8-22 16:15:34

Client said:client 27 Are you ok2017-8-22 16:15:34

Client said:client 24 Are you ok2017-8-22 16:15:34

Client said:client 10 Are you ok2017-8-22 16:15:34

Client said:client 6 Are you ok2017-8-22 16:15:34

Client said:client 28 Are you ok2017-8-22 16:15:34

Client said:client 26 Are you ok2017-8-22 16:15:34

Client said:client 4 Are you ok2017-8-22 16:15:34

Client said:client 17 Are you ok2017-8-22 16:15:34

Client said:client 20 Are you ok2017-8-22 16:15:34

Client said:client 22 Are you ok2017-8-22 16:15:34

Client said:client 23 Are you ok2017-8-22 16:15:34

Client said:client 14 Are you ok2017-8-22 16:15:34

Client said:client 16 Are you ok2017-8-22 16:15:34

Client said:client 13 Are you ok2017-8-22 16:15:34


到这里问题解决



以上是关于java netty tcp服务器线程阻塞问题定位解决的主要内容,如果未能解决你的问题,请参考以下文章

JAVA BIO 编程

Netty原理-从NIO开始

Netty模型篇一:Netty 线程模型架构 & 工作原理 解读

Netty介绍及应用

万分之一错误率问题的分析及定位

从经典多线程到 java.nio 异步/非阻塞服务器