谈谈SocketServer
Posted Hi橘子皮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了谈谈SocketServer相关的知识,希望对你有一定的参考价值。
在介绍kafka SocketServer之前,咱们先从整体对其架构了解一下(架构:简单理解就是软件系统的顶层结构)如下图:
我们知道 kafka客户端会与多个broker通信,所以服务端一般会收到高并发的访问请求,面对高并发、低延迟的需求,kafka服务端使用Reactor模型。
1 Reactor编程模型
网络层采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程的读者应该了解 Java NIO提供了实现Reactor模式的API。常见的单线程Java NIO的编程模式,具体如下:
工作原理:
创建ServerSocketChannel 并在Selector上注册Accept事件,这样 channel监听自己端口的链接;
当客户端发起链接请求时,selector监听到OP_ACCEPT事件,通知Acceptor处理此事件;
Acceptor接收到此事件,此时创建SocketChannel并设置成非阻塞(记得设置tcp诡异延迟40ms),此时可以在Selector上注册并监听感兴趣的I/O事件,比如ON_READ 或者ON_WRITE事件;
当客户端发送请求时,服务端监听到Read事件开始执行ReaderHandler逻辑,同样监听到Write事件时开始执行WriterHandler逻辑。
注: 此上的所有操作都在同一个线程里面完成,会导致某个操作比如Reader逻辑卡顿的话后面的请求会进不来,造成大量的超时请求。
2 改进Reactor模型
根据上文的表述,kafka应该是高并发、低延迟的,所以reader和writer逻辑应该分离拆成不同模块池处理,也就有处理原模型,如下图:
工作原理:
Acceptor工作再独立的线程,监听Connect事件,监听到后,选择人物最少的Reader线程分配SocketChannel;
ReaderThreadPool注册多个OP_READ线程,一个线程对应个多SocketChannel连接,读取成功后,放入MessageQueue;
Handler线程循环取出Queue中的请求进行逻辑处理,逻辑处理完成后,响应回馈给客户端也就是要求监听OP_WRITE事件。
注:其实这里的要点就是MessageQueue的长度问题,当时固定的大小就会出现拒绝请求响应,如果不限制,则会出现任务堆积的情况。
通过解析上述的设计,其实不难发现,还是存在I/O的瓶颈,也就是单个Selector可能在分发事件的时候造成阻塞,应该讲单个扩展成多个,如下图:
注:一般采取单个Acceptor存在Selector监听Accept连接,检测到后创建SocketChannel,采取一定得策略分发给不太忙碌的Selector集中的某位,开始由它进行读写。
3 SocketServer
网络层采用多线程多个selector实现。核心类是SocketServer,如下:
它包含:
Acceptor : 一般服务器有多个网卡,每个网卡对应一个Acceptor对象;
Processor : 每个Acceptor包含N个Processor,主要发送和接收数据;
Handler : 处理Request请求,根据不同消息头handler不用的逻辑;
RequestChannel :Processor线程与Handler线程间交换数据的队列;
ConnectionQuotas : 保存每个Acceptor的socketChannel连接数,有上限.
3.1 RequestChannel
在RequestChannel中包含了一个requestQueue和多个ResponseQueue队列,每个Processor线程对应一个ResponseQueue,Processor线程讲读取到的请求存到RequestQueue中,Handler线程从RequestQueue中取出请求进行处理,Handler产生的结果放入到Processor对应的ResponseQueue中,processor取出对应的ResponseQueue中的响应发送给客户端。
3.2 Handler
相对于Handler而言就是从Queue拉取请求处理逻辑,然后封装响应返回ResponseQueue,如下图:
Api.handle(req) 确认消息头处理逻辑,如下:
3.3 Processor
首先看下它的处理逻辑:
正如流程图所示
关键点:
Processor.accept() 获取新的SocketChannel,在configureNewConnections()中将新的SocketChannel注册到Selector;
processComplete…系列方法处理对应接收到的请求,取消或者开启监听3.4 Acceptor
它的主要功能在与接收客户端请求,创建Socket连接并分配Processor处理。
在它这个类中存在自己的NIO Selector和用于接收客户端请求的ServerSocketChannel,初始化时创建如下图:
正如前文说到的accept方法,创建连接并调用Processor.accept()
文末
大概说了下流程,功力不足很多知识点没有讲到,后期把它摘出来当做组件放出来》》》
以上是关于谈谈SocketServer的主要内容,如果未能解决你的问题,请参考以下文章