Netty-UDP与HTTPS协议

Posted 追求无悔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty-UDP与HTTPS协议相关的知识,希望对你有一定的参考价值。

写在前面
最近在使用netty进行数据采集的接收入口,协议分别使用了HTTPS协议和UDP协议,特来总结一下:
一、UDP协议
1. netty服务端代码
先进行处理器配置和端口绑定:处理器里就是具体的对接收到的数据进行业务处理。

public void startReceiveProcess() {
        LOGGER.info("UdpReceiver start receive process...");
        acceptGroup = new NioEventLoopGroup(1000);
        bootstrap = new Bootstrap();
        bootstrap.group(acceptGroup)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator())
                // 指定处理器
                .handler(new ChannelInitializer<DatagramChannel>() {
                    @Override
                    protected void initChannel(DatagramChannel datagramChannel) throws Exception {
                        ChannelPipeline pipeline = datagramChannel.pipeline();
                        pipeline.addLast(new UdpHandler());
                    }
                });
        try {
            int port = baseSource.getPort();
            LOGGER.info("UdpReceiver netty port is: {}", port);
            // 绑定端口
            udpChanel = bootstrap.bind(port).sync().channel();
            udpChanel.closeFuture().await(1000);
        } catch (InterruptedException ex) {
            LOGGER.error("udp netty exception: ", ex);
            closeSocket();
        }
    }

然后具体的处理逻辑如下:

private class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {
        @Override
        protected void channelRead0(ChannelHandlerContext context, DatagramPacket msg) {
            if (Objects.nonNull(msg)) {
                ByteBuf content = msg.content();
                byte[] msgByte = ByteUtil.byteBuf2Byte(content);
                String receive = new String(msgByte);
                LOGGER.info("udp netty server receive data: {}", receive);
                // TODO 进行数据具体处理
                baseSource.offerSource(msgByte);
                // TODO 响应信息
                context.writeAndFlush("");
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
            LOGGER.error("metadata server catch exception: ", cause);
            Channel channel = context.channel();
            if (channel != null) {
                channel.close();
            }
            context.close();
        }
    }

2. 特别注意
此处的DatagramPacket是io.netty里面的类,而不是java.net里面的类
曾遇到了一个很容易忽略的问题就是因为包引入错误。
我曾按照此方式写了一个UDP的客户端来进行数据发送,但是服务端一直没有打印接收到的数据,也没有错误信息,只有日志里面出现:

DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message DatagramPacket(/127.0.0.1:64692 => /0:0:0:0:0:0:0:0:30204, PooledUnsafeDirectByteBuf(ridx: 0, widx: 323, cap: 2048)) that reached at the tail of the pipeline. Please check your pipeline configuration.
DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [UdpReceiver$UdpHandler#0, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0x18250f38, L:/0:0:0:0:0:0:0:0:30204].

看上去收到了,但又感觉没收到。收到的长度确实也是对的,但没有打印具体的消息。

原来就是因为包引入错误,导致不会去处理消息,打断点也发现不会进入SimpleChannelInboundHandler的channelRead方法里去,引入io.netty.channel.socket.DatagramPacket后,就可以正常接收了。
这两者的写法有细微的差别:

io.netty.channel.socket.DatagramPacket
// 接收到的数据想处理的话,需要转换,因为获取到的为ByteBuf
如下:
ByteBuf content = packet.content();
// 需要通过byteBuf转化为byte数组,然后才能获取到具体字符串格式的信息。
String clientMessage = new String(bytebuf2Byte(content),"UTF-8");

public static byte[] bytebuf2Byte(ByteBuf in) {
    int len = in.readableBytes();
    byte[] arr = new byte[len];
    in.getBytes(0, arr);
    return arr;
}

如果是java.net.DatagramPacket

byte[] msg = packet.getData()
String msgStr = new String(msg, "utf-8");
// 获取到的直接是byte数组。

3. 客户端测试
用jmeter装插件也是可以发UDP请求的,但由于公司网络限制,无法下载就jmeter相关插件,所以只能通过自己写客户端程序进行请求发送。
核心代码为:

ByteBuf dataBuf = Unpooled.copiedBuffer(msg, Charset.forName("UTF-8"));
DatagramPacket datagramPacket = new DatagramPacket(dataBuf, inetSocketAddress)

channel.writeAndFlush(datagramPacket).addListener(new GenericFutureListener<ChannelFuture>() {
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
         boolean success = future.isSuccess();
         System.out.println("Sender datagramPacket result : " + success);
     }
});

二、HTTPS协议
1. HTTPS协议代码:

public void start() {
        bootstrap = new ServerBootstrap();
        bootstrap.channel(NioserverSocketChannel.class);
        bootstrap.option(NioChannelOption.SO_BACKLOG, RestfulParam.NIO_SO_BACKLOG_LENGTH);
        bootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
        bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
        workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
        businessGroup = new UnorderedThreadPoolEventExecutor(200, new DefaultThreadFactory("business"));
        try {
            bootstrap.group(bossGroup, workGroup);
            // TODO 稍后分析此部的作用
            SslContext sslContext = ContextSslFactory.setJksToNetty();
            metricsHandler = new MetricsHandler();
            bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast("metricHandler", metricsHandler);
                    pipeline.addLast("idleHandler", new ServerIdleCheckHandler());
                    pipeline.addLast("ssl", sslContext.newHandler(ch.alloc()));
                    pipeline.addLast("decoder", new HttpRequestDecoder(4096, 8192, 2097152));
                    pipeline.addLast("encoder", new HttpResponseEncoder());
                    pipeline.addLast("aggregator", new HttpObjectAggregator(2097152));
                    pipeline.addLast(businessGroup, new HttpRequestHandler());
                }
            });
            channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
        } catch (Exception exception) {
            LOGGER.error("set jks to netty error!!!", exception);
        }
    }

同样的,HttpRequestHandler是具体的业务处理实现:

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
            String uri = request.uri();
            // 创建http响应
            FullHttpResponse response;
            if (sourceConfig.getActiveApis().contains(uri)) {
                // 具体业务处理
                requestHandlerMap.get(uri).logMsgReceived(request, ctx);
                response = new DefaultFullHttpResponse(
                        HttpVersion.HTTP_1_1,
                        HttpResponseStatus.OK);
            } else {
                response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
            }
            // 设置头信息
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
            // 状态回写到客户端
            if (ctx.channel().isActive() && ctx.channel().isWritable()) {
                ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // ......
        }
    }

服务端和UDP协议的是同样的套路,只是接收数据的类型不一样。
2. 额外小知识:
对于HTTPS协议来说,通常会要求进行认证,因为客户端对于服务端来说是否可信,很重要,所以客户端需要有服务端提供的证书,来保证自己发送的请求对于服务端来说是可信的,然后才会被服务端接收。
所以会在Chandler中加了:pipeline.addLast("ssl", sslContext.newHandler(ch.alloc()));
SslContext sslContext = ContextSslFactory.setJksToNetty();
至于这里面具体做了些什么,主要就是读取证书,用来校验客户端连接的。

public static SslContext setJksToNetty() throws Exception {
    LOGGER.info("Begin to set JKS to netty !");
    SslContext sslContext;
    String normalPath = Normalizer.normalize(“身份证书路径”, Normalizer.Form.NFKC);
    String jksCanonicalPath = new File(normalPath).getCanonicalPath();
    try (InputStream is = FileUtils.openInputStream(new File(jksCanonicalPath))) {
        KeyStore keyStore = KeyStore.getInstance("JKS");
        keyStore.load(is, “证书密码”);

        KeyManagerFactory managerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        managerFactory.init(keyStore, pwd.toCharArray());

        sslContext = SslContextBuilder.forServer(managerFactory)
            .protocols("TLSv1.2")
            .trustManager(new File(“信任证书路径”))
            .clientAuth(ClientAuth.REQUIRE)
           // 加密协议 .ciphers("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256")
            .build();
    }
    return sslContext;
}

3. 如何测试 ?
这时候就可以用jmeter来上报数据了,大概配置就是:

对于用了证书的jmeter该怎么配置呢?
区别在于启动方式,不用证书的情况下可以直接使用jmeter.bat启动,使用证书的情况下,启动为:

jmeter -Djavax.net.ssl.trustStore=D:\\DailyDev\\jmeter\\Network_Product_CA.cer -Djavax.net.ssl.keyStorePassword=密码 -Djavax.net.ssl.keyStore=D:\\DailyDev\\jmeter\\keystore.jks

当然也可以在jmeter.properties里面配置好证书路径和密码,然后直接启动:

三、总结
其实关于netty的常用代码大概就是这么多了,但是关于netty的底层知识,还是有很多内容的,毕竟很强大,不然也不会有那么多三方件都用netty来进行底层的网络交互了,像kafka等那些,在引入jar的时候,都可以看到里面包含了netty的相关依赖。

以上是关于Netty-UDP与HTTPS协议的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

git项目初次push提示error: failed to push some refs to https://gitee.com/xxxx/gittest.git’解决方案 --九五小庞(代码片段

这两个代码片段有啥区别?

PHP代码-psysh调试代码片段工具

1.HTTP与HTTPS区别

typescript Angular 2测试片段。代码库https://developers.livechatinc.com/blog/category/programming/angular-2/