谈谈SocketServer

Posted Hi橘子皮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了谈谈SocketServer相关的知识,希望对你有一定的参考价值。

在介绍kafka SocketServer之前,咱们先从整体对其架构了解一下(架构:简单理解就是软件系统的顶层结构)如下图:

我们知道 kafka客户端会与多个broker通信,所以服务端一般会收到高并发的访问请求,面对高并发、低延迟的需求,kafka服务端使用Reactor模型。

1 Reactor编程模型

网络层采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程的读者应该了解 Java NIO提供了实现Reactor模式的API。常见的单线程Java NIO的编程模式,具体如下:

谈谈SocketServer

工作原理:

  1. 创建ServerSocketChannel 并在Selector上注册Accept事件,这样 channel监听自己端口的链接;

  2. 当客户端发起链接请求时,selector监听到OP_ACCEPT事件,通知Acceptor处理此事件;

  3. Acceptor接收到此事件,此时创建SocketChannel并设置成非阻塞(记得设置tcp诡异延迟40ms),此时可以在Selector上注册并监听感兴趣的I/O事件,比如ON_READ 或者ON_WRITE事件;

  4. 当客户端发送请求时,服务端监听到Read事件开始执行ReaderHandler逻辑,同样监听到Write事件时开始执行WriterHandler逻辑。

注: 此上的所有操作都在同一个线程里面完成,会导致某个操作比如Reader逻辑卡顿的话后面的请求会进不来,造成大量的超时请求。

2 改进Reactor模型

   根据上文的表述,kafka应该是高并发、低延迟的,所以reader和writer逻辑应该分离拆成不同模块池处理,也就有处理原模型,如下图:

谈谈SocketServer

工作原理:

  1. Acceptor工作再独立的线程,监听Connect事件,监听到后,选择人物最少的Reader线程分配SocketChannel;

  2. ReaderThreadPool注册多个OP_READ线程,一个线程对应个多SocketChannel连接,读取成功后,放入MessageQueue;

  3. Handler线程循环取出Queue中的请求进行逻辑处理,逻辑处理完成后,响应回馈给客户端也就是要求监听OP_WRITE事件。

注:其实这里的要点就是MessageQueue的长度问题,当时固定的大小就会出现拒绝请求响应,如果不限制,则会出现任务堆积的情况。

通过解析上述的设计,其实不难发现,还是存在I/O的瓶颈,也就是单个Selector可能在分发事件的时候造成阻塞,应该讲单个扩展成多个,如下图:

谈谈SocketServer

注:一般采取单个Acceptor存在Selector监听Accept连接,检测到后创建SocketChannel,采取一定得策略分发给不太忙碌的Selector集中的某位,开始由它进行读写。

3  SocketServer

网络层采用多线程多个selector实现。核心类是SocketServer,如下:

谈谈SocketServer

它包含:

         Acceptor : 一般服务器有多个网卡,每个网卡对应一个Acceptor对象;

         Processor : 每个Acceptor包含NProcessor,主要发送和接收数据;

         Handler : 处理Request请求,根据不同消息头handler不用的逻辑;

         RequestChannel Processor线程与Handler线程间交换数据的队列;

         ConnectionQuotas : 保存每个AcceptorsocketChannel连接数,有上限.

3.1  RequestChannel

谈谈SocketServer

在RequestChannel中包含了一个requestQueue和多个ResponseQueue队列,每个Processor线程对应一个ResponseQueue,Processor线程讲读取到的请求存到RequestQueue中,Handler线程从RequestQueue中取出请求进行处理,Handler产生的结果放入到Processor对应的ResponseQueue中,processor取出对应的ResponseQueue中的响应发送给客户端。

谈谈SocketServer       3.2  Handler

相对于Handler而言就是从Queue拉取请求处理逻辑,然后封装响应返回ResponseQueue,如下图:

谈谈SocketServer

Api.handle(req) 确认消息头处理逻辑,如下:

谈谈SocketServer      3.3  Processor

首先看下它的处理逻辑:

谈谈SocketServer

正如流程图所示

关键点:

  1. Processor.accept() 获取新的SocketChannel,在configureNewConnections()中将新的SocketChannel注册到Selector;

  2. processComplete…系列方法处理对应接收到的请求,取消或者开启监听3.4 Acceptor

它的主要功能在与接收客户端请求,创建Socket连接并分配Processor处理。

在它这个类中存在自己的NIO Selector和用于接收客户端请求的ServerSocketChannel,初始化时创建如下图:

谈谈SocketServer

正如前文说到的accept方法,创建连接并调用Processor.accept()

文末

   大概说了下流程,功力不足很多知识点没有讲到,后期把它摘出来当做组件放出来》》》



以上是关于谈谈SocketServer的主要内容,如果未能解决你的问题,请参考以下文章

2020-09-16:谈谈TCP的控制位?

谈谈你对组件式GIS认识

简单谈谈编程语言

谈谈工厂的作用

谈谈MediaStream

谈谈Java的Collection接口