聊聊Dotnetty

Posted aofengdaxia

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊Dotnetty相关的知识,希望对你有一定的参考价值。

以前,我曾经写过一个C/S的期货交易系统。C/S就绕不开通信,我大概的了解了.net 的通信机制后,选择用TCP长连接实现了通信,客户端可以调用服务器端,服务器端可以主动推送消息到客户端。实现是实现了,但是代码笨重而且低效。近来又要解决一个C/S的通信问题。感觉到自己以前写的通信继续复用会触犯自己的洁癖,同时深感自己的精力大不如以前,重新写一个优雅而且高效的通信的中间件没有任何信心。于是就在网络上找到了Dotnetty的通信中间件。


Dotnetty是Netty的.net版本。而且是微软的azure团队实现的编码,所以质量和效率上应该没得说,最起码比我自己码出来的要高级很多。就它了!~


如果你只是在项目中使用dotnetty,在Nuget中去应用相关的类库就可以了。如果要了解dotnetty的例子和源码。可以在git上下载。传输门


dotnetty的实现是完全参考了Netty,类库,使用方法都差不多。dotnetty的学习资料非常少,所以可以看着netty的教程去使用dotnetty,除了部分语法不完全一致,直接使用没有障碍。关于netty的学习有一本电子书传输门

来段例子大概的做一下概念的了解

server端:

        static async Task RunServerAsync()
        
            //重要的概念 IEventLoopGroup 代表了处理网络的线程,server端用两个,client端一个就够了。boss处理网络连接的概念,client处理数据概念。两个IEventLoopGroup不代表两个线程,而是代表了两组线程
            IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1);
            IEventLoopGroup workerGroup= new MultithreadEventLoopGroup();

            //如果通信需要加密的话,使用证书
            X509Certificate2 tlsCertificate = null;
            if (ServerSettings.IsSsl)
            
                tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password");
            
            try
            
                var bootstrap = new ServerBootstrap();
                bootstrap.Group(bossGroup, workerGroup);
                bootstrap.Channel<TcpServerSocketChannel>();
                bootstrap
                    .Option(ChannelOption.SoBacklog, 100)
                    .Handler(new LoggingHandler("SRV-LSTN"))
                    .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                    
                        //重要概念 增加Handler和Decoder,Encoder到pipeline中去
                        IChannelPipeline pipeline = channel.Pipeline;
                        if (tlsCertificate != null)
                        
                            pipeline.AddLast("tls", TlsHandler.Server(tlsCertificate));
                        
                        pipeline.AddLast(new LoggingHandler("SRV-CONN"));
                        pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
                        pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
                        //自定义的handler
                        pipeline.AddLast("echo", new EchoServerHandler());
                    ));

                IChannel boundChannel = await bootstrap.BindAsync(ServerSettings.Port);
                Console.ReadLine();
                await boundChannel.CloseAsync();
            
            finally
            
                await Task.WhenAll(
                    bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
                    workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
            
        

客户端:

static async Task RunClientAsync()
        

            var group = new MultithreadEventLoopGroup();

            X509Certificate2 cert = null;
            string targetHost = null;
            if (ClientSettings.IsSsl)
            
                cert = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password");
                targetHost = cert.GetNameInfo(X509NameType.DnsName, false);
            
            try
            
                var bootstrap = new Bootstrap();
                bootstrap
                    .Group(group)
                    .Channel<TcpSocketChannel>()
                    .Option(ChannelOption.TcpNodelay, true)
                    .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
                    
                        IChannelPipeline pipeline = channel.Pipeline;

                        if (cert != null)
                        
                            pipeline.AddLast("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost)));
                        
                        pipeline.AddLast(new LoggingHandler());
                        pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
                        pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

                        pipeline.AddLast("echo", new EchoClientHandler());
                    ));

                IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port));

                Console.ReadLine();

                await clientChannel.CloseAsync();
            
            finally
            
                await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
            
        

Server端的Handler的定义:

public class EchoServerHandler : ChannelHandlerAdapter
    
        public override void ChannelRead(IChannelHandlerContext context, object message)
        
            var buffer = message as IByteBuffer;
            if (buffer != null)
            
                Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));
            
            context.WriteAsync(message);
        

        public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
        
            Console.WriteLine("Exception: " + exception);
            context.CloseAsync();
        
    

客户端的例子:

public class EchoClientHandler : ChannelHandlerAdapter
    
        readonly IByteBuffer initialMessage;

        public EchoClientHandler()
        
            this.initialMessage = Unpooled.Buffer(ClientSettings.Size);
            byte[] messageBytes = Encoding.UTF8.GetBytes("Hello world");
            this.initialMessage.WriteBytes(messageBytes);
        

        public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage);

        public override void ChannelRead(IChannelHandlerContext context, object message)
        
            var byteBuffer = message as IByteBuffer;
            if (byteBuffer != null)
            
                Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8));
            
            context.WriteAsync(message);
        

        public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
        
            Console.WriteLine("Exception: " + exception);
            context.CloseAsync();
        
    

代码来自官网。简单的描述了实现的流程。具体的理解,有时间了,在做一下记录。
很不错的Netty学习博客
https://blog.csdn.net/z69183787/article/category/6412734/2

以上是关于聊聊Dotnetty的主要内容,如果未能解决你的问题,请参考以下文章

.NET 通信传输利器Netty(Net is DotNetty)介绍

使用 DotNetty 实现 Redis 的一个控制台应用程序

Reactor模式的.net版本简单实现--DEMO

聊聊 Kafka 那点破事!

被喷了?聊聊我开源RPC框架的那些事

四种代码洁癖类型,程序员看了直呼内行