netty 的流量整形写测试

Posted 10年 Java程序员,硬核人生!勇往直前,永不退缩!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty 的流量整形写测试相关的知识,希望对你有一定的参考价值。

 如何测试netty 的流量整形呢? 因为 其实服务端、客户端都可以限速,读写也可以限速,有多个组合,所以比较复杂;
 
考虑一种最简单的情况,就是单独对客户端写进行限速,如下:
 
服务端等待客户端连接;接受客户端请求之后,打印每秒收到的数据有多少, 然后简单打印一下速率;怎么做呢? 服务端对每一个客户端的channel保存一个计数器用于统计收到的总流量大小consumeMsgLength,然后启动一个定时器,每秒执行一次,任务是把consumeMsgLength 打印一下,然后置为0 (相当于是重置);
 
客户端连接到服务端,然后发送请求,比如是每秒随机写10M,也就是 10M/s 的写(当然也可以 设置一个范围,如 1~10 M/s ,客户端固定速度的写是为了 测试方便起见, 实际情况可能非常不同),但是限速是每秒1M,也就是 rate = 1M/s;那么 客户端理论上发送的数据 会有多少, 应该是1M/s, 多余的数据到哪里去了? 其实控制好就不会有这种问题。因为客户端应该进行控制,一定要根据 客制化的可写性判断,允许写才写!
 
下面是我测试通过的代码以及说明。
 
服务端关键代码
    ServerBootstrap sbs = new ServerBootstrap();
            // 配置nio服务参数
            sbs.group(bossGroup, workerGroup)
               .channel(NioserverSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
               .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
               .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
               .handler(new LoggingHandler(LogLevel.INFO)) // 打印日志级别
               .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 处理接收到的请求
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 基于换行符号
//                        pipeline.addLast(new LineBasedFrameDecoder(1024)); // 这个会导致 channelRead0 没有执行,因为 数据一直在接收之中.. 等待一个换行
                        // 解码转String,注意调整自己的编码格式GBK、UTF-8
                        pipeline.addLast(new StringDecoder(Charset.forName("GBK")));
                        // 解码转String,注意调整自己的编码格式GBK、UTF-8
                        pipeline.addLast(new StringEncoder(Charset.forName("GBK")));

                        //流量整形
//                        pipeline.addLast(new ChannelTrafficShapingHandler(10, 10));
                        pipeline.addLast(new MyServerCommonHandler( ));

管道处理器:

import io.netty.channel.*;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class MyServerCommonHandler extends ChannelInboundHandlerAdapter {

    protected final int KB = 1024;//  * 1024;
    protected String tempStr;
    protected AtomicLong consumeMsgLength;
    protected Runnable counterTask;
    protected boolean sentFlag;

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if(ctx.channel().isWritable() && !sentFlag) {
            System.out.println(" ###### 重新开始写数据 ######");
        } else {
            System.out.println(" ===== 写暂停 =====");
        }
    }
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("MyServerCommonHandler.handlerAdded");
        consumeMsgLength = new AtomicLong();
        counterTask = () -> {
          while (true) {
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {

              }

              long length = consumeMsgLength.getAndSet(0);
              System.out.println("*** " + ctx.channel().remoteAddress() + " rate(KB/S):" + (length) + "  " + new Date());
          }
        };
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < KB; i++) {
            builder.append("abcdefghijklmnopqrstuvwxyz");
        }
        tempStr = builder.toString();
        super.handlerAdded(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new Thread(counterTask).start();
    }


    protected abstract void sentData(ChannelHandlerContext ctx);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof String) {
            int length = ((String) msg).length();
            // // 为什么这里 是默认 15s 一次, 因为io.netty.handler.traffic.AbstractTrafficShapingHandler.DEFAULT_MAX_TIME = 15 000
            long l = consumeMsgLength.addAndGet(length);// 为什么每次都是 10240 ?  因为客户端写过来的数据就是这么多
            System.out.println("===== receive client " + " " + " size:" + l);
        }
//        System.out.println("===== receive client msg : " + msg + " \\n  " + l);
        super.channelRead(ctx, msg);
    }
}

 

客户端
   @Override
              protected void initChannel(SocketChannel socketChannel) throws Exception {                  
                    // 处理来自服务端的响应信息
                  ChannelPipeline pipeline = socketChannel.pipeline();
                  ChannelTrafficShapingHandler channelTrafficShapingHandler = new MyChannelTrafficShapingHandler((long) (0.001 * KB), 1 * KB);
                  // 解码转String,注意调整自己的编码格式GBK、UTF-8
                  pipeline.addLast(new StringDecoder(Charset.forName("GBK")));
                  // 解码转String,注意调整自己的编码格式GBK、UTF-8
                  pipeline.addLast(new StringEncoder(Charset.forName("GBK")));
                  pipeline
                    .addLast("channelTrafficShapingHandler",channelTrafficShapingHandler)
                    .addLast(new ClientOutHandler())

什么设置:writeLimit = 0.001 * KB,也就是 每秒最多写 0.001 * KB,

管道处理器:

import com.zx.sms.testNetty.Student;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;

import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Date;

/**
 * 读取服务器返回的响应信息
 * @author luo
 *
 */
public class ClientOutHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("write = " + new Date() + " ctx.channel().isWritable()  " + ctx.channel().isWritable());
        super.write(ctx, msg, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("ClientOutHandler.disconnect" + new Date());
        super.disconnect(ctx, promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ClientOutHandler.read" + new Date());
        super.read(ctx);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("ClientOutHandler.close" + new Date());
        super.close(ctx, promise);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("ctx = [" + ctx + "], remoteAddress = [" + remoteAddress + "], localAddress = [" + localAddress + "], promise = [" + promise + "]" + new Date());
        super.connect(ctx, remoteAddress, localAddress, promise);

        new Thread() {
            @Override
            public void run() {
                StringBuilder builder = new StringBuilder();
                for (int i = 0; i < 1024*1; i++) {
                    builder.append("abcdefghij");
                }
                int cnt = 0;
                String o = builder.toString();
                while (cnt < 1000) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    boolean writable = ctx.channel().isWritable();
                    /*
                        因为io.netty.handler.traffic.AbstractTrafficShapingHandler.DEFAULT_MAX_TIME
                        所以 无论, 都会写一次。
                     */
                    if (writable) {
                        ctx.channel().writeAndFlush(o);
                    } else{
//                        System.out.println("ClientOutHandler.run ctx.channel().isWritable() no");
                    }

                    cnt++;
                }
            }
        }.start();
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        super.flush(ctx);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("ctx = [" + ctx + "], promise = [" + promise + "]    " + new Date() + " ctx.channel().isWritable()  " + ctx.channel().isWritable());
        super.deregister(ctx, promise);
    }


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ctx = [" + ctx + "] " + new Date());
        super.handlerAdded(ctx);
    }
}

 

 
和guava 的RateLimiter 不同,netty的 TrafficShapingHandler也是 先等待(最久等待默认是15s ),然后 时间到了,然后读或者写, 不管数据量有多大。
 
也就是说, 如果 rate = 1M/s, 第一次写100M数据,按理说是需要 100s, 但是实际上只会等待15s, 然后就把100 m, 一次性写过去。 也就是说, 存在突刺的情况!
 
当然, 其实现原理也是差别很大。
 
测试,服务端日志:
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:21 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:22 CST 2021
===== receive client   size:10240
MyServerCommonHandler.channelReadComplete     Sat May 29 20:38:22 CST 2021
*** /127.0.0.1:33533 rate(KB/S):10240  Sat May 29 20:38:23 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:24 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:25 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:26 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:27 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:28 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:29 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:30 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:31 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:32 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:33 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:34 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:35 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:36 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:37 CST 2021
===== receive client   size:10240
MyServerCommonHandler.channelReadComplete     Sat May 29 20:38:37 CST 2021
*** /127.0.0.1:33533 rate(KB/S):10240  Sat May 29 20:38:38 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:39 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:40 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:41 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:42 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:43 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:44 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:45 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:46 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:47 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:48 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:49 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:50 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:51 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:52 CST 2021
===== receive client   size:10240
MyServerCommonHandler.channelReadComplete     Sat May 29 20:38:52 CST 2021
*** /127.0.0.1:33533 rate(KB/S):10240  Sat May 29 20:38:53 CST 2021
*** /127.0.0.1:33533 rate(KB/S):0  Sat May 29 20:38:54 CST 2021
 
客户端打印:
write = Sat May 29 20:37:37 CST 2021 ctx.channel().isWritable()  true
write = Sat May 29 20:37:52 CST 2021 ctx.channel().isWritable()  true
write = Sat May 29 20:38:07 CST 2021 ctx.channel().isWritable()  true
write = Sat May 29 20:38:22 CST 2021 ctx.channel().isWritable()  true
write = Sat May 29 20:38:37 CST 2021 ctx.channel().isWritable()  true
write = Sat May 29 20:38:52 CST 2021 ctx.channel().isWritable()  true
write = Sat May 29 20:39:07 CST 2021 ctx.channel().isWritable()  true

 

可见, 每秒统计速度,都是0,但是每15秒就写一次。 receive client size:10240 打印的就是 实际接受的数据, 符合预期!
 
 
需要注意的是, 客户端写限速之后,客户端执行ctx.channel().writeAndFlush(o) 方法需要先判断一下 是否可写。否则限速就完全无效!!!
 
因为 ChannelTrafficShapingHandler 实际上不做限制,而是 仅仅给对应的通道做一个标记!
 
如果改一下, 把
ClientOutHandler#connect 的 builder.append("abcdefghij"); 改为 builder.append("a");
new MyChannelTrafficShapingHandler((long) (0.001 * KB), 1 * KB); 改为 new MyChannelTrafficShapingHandler((long) (1 * KB), 1 * KB);
 
也就是 每次写 1024 B,每秒10次即每秒写 10240 B即 1KB, 限速也是 1 KB, 那么, 应该来说,是每秒 只能写一次,观察 客户端日志:
write =  Sat May 29 23:14:34 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:34 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:34 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable()  true
write =  Sat May 29 23:15:14 CST 2021 ctx.channel().isWritable()  流量整形(GTS和LR)

微服务技术栈:流量整形算法,服务熔断与降级

教程篇(6.4) 04. 流量整形 ❀ SD-WAN ❀ Fortinet 网络安全架构师 NSE7

举一反三:TCP协议是如何实现网络级的流控的?

服务访问质量(QoS)——流量整形与拥塞管理

OPNsense设置带宽整形教程