netty 的流量整形写测试
Posted 10年 Java程序员,硬核人生!勇往直前,永不退缩!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty 的流量整形写测试相关的知识,希望对你有一定的参考价值。
如何测试netty 的流量整形呢? 因为 其实服务端、客户端都可以限速,读写也可以限速,有多个组合,所以比较复杂;
考虑一种最简单的情况,就是单独对客户端写进行限速,如下:
服务端等待客户端连接;接受客户端请求之后,打印每秒收到的数据有多少, 然后简单打印一下速率;怎么做呢? 服务端对每一个客户端的channel保存一个计数器用于统计收到的总流量大小consumeMsgLength,然后启动一个定时器,每秒执行一次,任务是把consumeMsgLength 打印一下,然后置为0 (相当于是重置);
客户端连接到服务端,然后发送请求,比如是每秒随机写10M,也就是 10M/s 的写(当然也可以 设置一个范围,如 1~10 M/s ,客户端固定速度的写是为了 测试方便起见, 实际情况可能非常不同),但是限速是每秒1M,也就是 rate = 1M/s;那么 客户端理论上发送的数据 会有多少, 应该是1M/s, 多余的数据到哪里去了? 其实控制好就不会有这种问题。因为客户端应该进行控制,一定要根据 客制化的可写性判断,允许写才写!
下面是我测试通过的代码以及说明。
服务端关键代码
ServerBootstrap sbs = new ServerBootstrap(); // 配置nio服务参数 sbs.group(bossGroup, workerGroup) .channel(NioserverSocketChannel.class) // 说明一个新的Channel如何接收进来的连接 .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接 .handler(new LoggingHandler(LogLevel.INFO)) // 打印日志级别 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 处理接收到的请求 ChannelPipeline pipeline = socketChannel.pipeline(); // 基于换行符号 // pipeline.addLast(new LineBasedFrameDecoder(1024)); // 这个会导致 channelRead0 没有执行,因为 数据一直在接收之中.. 等待一个换行 // 解码转String,注意调整自己的编码格式GBK、UTF-8 pipeline.addLast(new StringDecoder(Charset.forName("GBK"))); // 解码转String,注意调整自己的编码格式GBK、UTF-8 pipeline.addLast(new StringEncoder(Charset.forName("GBK"))); //流量整形 // pipeline.addLast(new ChannelTrafficShapingHandler(10, 10)); pipeline.addLast(new MyServerCommonHandler( ));
管道处理器:
import io.netty.channel.*; import java.util.Date; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class MyServerCommonHandler extends ChannelInboundHandlerAdapter { protected final int KB = 1024;// * 1024; protected String tempStr; protected AtomicLong consumeMsgLength; protected Runnable counterTask; protected boolean sentFlag; @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { if(ctx.channel().isWritable() && !sentFlag) { System.out.println(" ###### 重新开始写数据 ######"); } else { System.out.println(" ===== 写暂停 ====="); } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("MyServerCommonHandler.handlerAdded"); consumeMsgLength = new AtomicLong(); counterTask = () -> { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { } long length = consumeMsgLength.getAndSet(0); System.out.println("*** " + ctx.channel().remoteAddress() + " rate(KB/S):" + (length) + " " + new Date()); } }; StringBuilder builder = new StringBuilder(); for (int i = 0; i < KB; i++) { builder.append("abcdefghijklmnopqrstuvwxyz"); } tempStr = builder.toString(); super.handlerAdded(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { new Thread(counterTask).start(); } protected abstract void sentData(ChannelHandlerContext ctx); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof String) { int length = ((String) msg).length(); // // 为什么这里 是默认 15s 一次, 因为io.netty.handler.traffic.AbstractTrafficShapingHandler.DEFAULT_MAX_TIME = 15 000 long l = consumeMsgLength.addAndGet(length);// 为什么每次都是 10240 ? 因为客户端写过来的数据就是这么多 System.out.println("===== receive client " + " " + " size:" + l); } // System.out.println("===== receive client msg : " + msg + " \\n " + l); super.channelRead(ctx, msg); } }
客户端
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 处理来自服务端的响应信息 ChannelPipeline pipeline = socketChannel.pipeline(); ChannelTrafficShapingHandler channelTrafficShapingHandler = new MyChannelTrafficShapingHandler((long) (0.001 * KB), 1 * KB); // 解码转String,注意调整自己的编码格式GBK、UTF-8 pipeline.addLast(new StringDecoder(Charset.forName("GBK"))); // 解码转String,注意调整自己的编码格式GBK、UTF-8 pipeline.addLast(new StringEncoder(Charset.forName("GBK"))); pipeline .addLast("channelTrafficShapingHandler",channelTrafficShapingHandler) .addLast(new ClientOutHandler())
什么设置:writeLimit = 0.001 * KB,也就是 每秒最多写 0.001 * KB,
管道处理器:
import com.zx.sms.testNetty.Student; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; import java.net.SocketAddress; import java.nio.charset.Charset; import java.util.Date; /** * 读取服务器返回的响应信息 * @author luo * */ public class ClientOutHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("write = " + new Date() + " ctx.channel().isWritable() " + ctx.channel().isWritable()); super.write(ctx, msg, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("ClientOutHandler.disconnect" + new Date()); super.disconnect(ctx, promise); } @Override public void read(ChannelHandlerContext ctx) throws Exception { System.out.println("ClientOutHandler.read" + new Date()); super.read(ctx); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("ClientOutHandler.close" + new Date()); super.close(ctx, promise); } @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { System.out.println("ctx = [" + ctx + "], remoteAddress = [" + remoteAddress + "], localAddress = [" + localAddress + "], promise = [" + promise + "]" + new Date()); super.connect(ctx, remoteAddress, localAddress, promise); new Thread() { @Override public void run() { StringBuilder builder = new StringBuilder(); for (int i = 0; i < 1024*1; i++) { builder.append("abcdefghij"); } int cnt = 0; String o = builder.toString(); while (cnt < 1000) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } boolean writable = ctx.channel().isWritable(); /* 因为io.netty.handler.traffic.AbstractTrafficShapingHandler.DEFAULT_MAX_TIME 所以 无论, 都会写一次。 */ if (writable) { ctx.channel().writeAndFlush(o); } else{ // System.out.println("ClientOutHandler.run ctx.channel().isWritable() no"); } cnt++; } } }.start(); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { super.flush(ctx); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("ctx = [" + ctx + "], promise = [" + promise + "] " + new Date() + " ctx.channel().isWritable() " + ctx.channel().isWritable()); super.deregister(ctx, promise); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("ctx = [" + ctx + "] " + new Date()); super.handlerAdded(ctx); } }
和guava 的RateLimiter 不同,netty的 TrafficShapingHandler也是 先等待(最久等待默认是15s ),然后 时间到了,然后读或者写, 不管数据量有多大。
也就是说, 如果 rate = 1M/s, 第一次写100M数据,按理说是需要 100s, 但是实际上只会等待15s, 然后就把100 m, 一次性写过去。 也就是说, 存在突刺的情况!
当然, 其实现原理也是差别很大。
测试,服务端日志:
*** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:21 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:22 CST 2021 ===== receive client size:10240 MyServerCommonHandler.channelReadComplete Sat May 29 20:38:22 CST 2021 *** /127.0.0.1:33533 rate(KB/S):10240 Sat May 29 20:38:23 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:24 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:25 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:26 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:27 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:28 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:29 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:30 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:31 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:32 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:33 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:34 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:35 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:36 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:37 CST 2021 ===== receive client size:10240 MyServerCommonHandler.channelReadComplete Sat May 29 20:38:37 CST 2021 *** /127.0.0.1:33533 rate(KB/S):10240 Sat May 29 20:38:38 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:39 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:40 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:41 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:42 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:43 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:44 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:45 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:46 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:47 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:48 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:49 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:50 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:51 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:52 CST 2021 ===== receive client size:10240 MyServerCommonHandler.channelReadComplete Sat May 29 20:38:52 CST 2021 *** /127.0.0.1:33533 rate(KB/S):10240 Sat May 29 20:38:53 CST 2021 *** /127.0.0.1:33533 rate(KB/S):0 Sat May 29 20:38:54 CST 2021
客户端打印:
write = Sat May 29 20:37:37 CST 2021 ctx.channel().isWritable() true write = Sat May 29 20:37:52 CST 2021 ctx.channel().isWritable() true write = Sat May 29 20:38:07 CST 2021 ctx.channel().isWritable() true write = Sat May 29 20:38:22 CST 2021 ctx.channel().isWritable() true write = Sat May 29 20:38:37 CST 2021 ctx.channel().isWritable() true write = Sat May 29 20:38:52 CST 2021 ctx.channel().isWritable() true write = Sat May 29 20:39:07 CST 2021 ctx.channel().isWritable() true
可见, 每秒统计速度,都是0,但是每15秒就写一次。 receive client size:10240 打印的就是 实际接受的数据, 符合预期!
需要注意的是, 客户端写限速之后,客户端执行ctx.channel().writeAndFlush(o) 方法需要先判断一下 是否可写。否则限速就完全无效!!!
因为 ChannelTrafficShapingHandler 实际上不做限制,而是 仅仅给对应的通道做一个标记!
如果改一下, 把
ClientOutHandler#connect 的 builder.append("abcdefghij"); 改为 builder.append("a");
new MyChannelTrafficShapingHandler((long) (0.001 * KB), 1 * KB); 改为 new MyChannelTrafficShapingHandler((long) (1 * KB), 1 * KB);
也就是 每次写 1024 B,每秒10次即每秒写 10240 B即 1KB, 限速也是 1 KB, 那么, 应该来说,是每秒 只能写一次,观察 客户端日志:
write = Sat May 29 23:14:34 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:34 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:34 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:39 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:44 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:49 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:54 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:14:59 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:04 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:09 CST 2021 ctx.channel().isWritable() true write = Sat May 29 23:15:14 CST 2021 ctx.channel().isWritable() 流量整形(GTS和LR)