基于共享内存的无锁消息队列设计

Posted 上交所技术服务

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于共享内存的无锁消息队列设计相关的知识,希望对你有一定的参考价值。

 

夏金波 姚振锋 康乐 郑州商品交易所 jbxia@czce.com.cn

摘要:基于共享内存的无锁消息队列,是郑商所在五期交易系统中使用的关键技术之一,已于2015年9月正式上线使用。该技术有效支持了郑商所交易系统的架构调整,系统在吞吐量和响应时延上有了很大改进。本文从其需求、关键技术、系统设计和性能对该消息队列进行了介绍。

关键字:消息队列交易系统 低时延 吞吐量

一、背景

消息队列是一种进程、系统间进行通信的常用组件。应用程序通过读写出入队列的消息来通信,实现异步通信,简化系统设计;消息队列还能作为缓存,提高系统吞吐量。

作为一个基础性的工具,Unix操作系统上很早就提供了系统级的消息队列 。郑商所四期交易系统(2005至2015年作为生产系统)使用HP-UX提供的操作系统级消息队列,进行通信进程和应用进程(前置、撮合核心)的消息传递。随着期货市场规模不断扩大,整个市场技术水平不断进步,对交易系统的吞吐量和时延提出了更高的要求。

根据HP小型机上测试结果,该系统消息队列时延为50μs。系统中消息4次经过消息队列,因此仅消息队列时延就有200μs这与目前国际先进的交易系统门到门100μs左右的时延存在巨大差距。消息队列是四期交易系统提高性能的重大瓶颈。

比较成熟的消息队列产品中,开源项目有高性能的异步消息库ZeroMQ,基于高级消息队列协议(AMQP)的RabbitMQ、ActiveMQ等;商业应用有TIBCO Enterprise Message Service,IMB WebSphereMQ Low Latency Messaging等。

它们除了基本的消息队列功能,还包括路由、订阅、连接管理、高可靠、消息持久化等功能,是功能齐全的消息中间件。除ZeroMQ外,由于功能复杂,都需要多节点部署。

由于目前系统中对消息队列的需求是在主机内部提供多进程/线程的消息通信;而通用的消息中间件产品功能过多,部署复杂,分布式架构造成的与主机内部IPC机制存在性能差距。简单一些的ZeroMQ库提供进程间进行消息收发的功能,但是其基于UNIX  Domain Socket,性能难以超越系统IPC的限制。

在对原有交易系统的升级中,通过对通信进程、消息队列、日志传输等关键路径进行优化,提高了系统吞吐量、降低了系统时延。

本文对升级过程中,针对系统架构需求,消息队列使用的关键技术、设计、性能进行介绍。

二、消息队列的应用需求

(一)  通信架构的升级

图 1和图 2是四期交易系统中,远程前置机以及撮合核心的通信架构图。

对于每一个交易员(对前置机而言,对撮合服务来讲为前置机)连接,系统创建一个MTI(Message Transfer Interface)通信进程,同时创建一个下行队列,供服务程序将处理结果回写。上行消息队列只有一个,全部MTI进程都将上行消息写入该队列。


1远程前置机通信架构

基于共享内存的无锁消息队列设计

2撮合核心通信架构

该系统架构有两个问题,一个是使用多进程的服务方式,为每个网络连接创建一个通信进行,导致系统中存在数百个服务进程,频繁的进程切换带来了大量系统开销;另一个是使用操作系统提供的消息队列,性能较低。

针对这两点问题,分别采用以下方式进行改进:使用单进程轮询复用的通信服务方式替换多进程通信服务;引用效率更高的消息队列进行服务进程和通信进程的消息传递。

升级后的系统架构如图 3所示。

系统中有两个消息队列:上行消息队列和下行消息队列。上行消息队列数据来源为通信程序,获得交易员的登录、退出,以及报单消息,供GW服务进程读取;下行消息队列数据来源为GW服务进程,数据有报单应答,成交、持仓变化等,供通信进程读取并发送给交易员。

通信进程负责所有交易员的连接管理和通信服务,内部有多个worker线程和一个main线程。

worker线程负责接收交易员的数据,发送到上行消息队列;并对下行数据缓存进行维护,及时将数据发送给交易员。main线程从下行消息队列读取数据,根据连接状态或直接发送,或者写入缓存、供worker线程发送。

基于共享内存的无锁消息队列设计

3升级后前置机通信架构

(二)  功能和性能需求

根据新的通信架构,消息队列承担消息传递的简单功能。其从功能和性能方面有以下的需求:

1、支持多写一读

根据应用的需要,一个消息队列可能有多个写者。负责上行数据处理的通信进程也可以使用多线程方式,以便充分利用CPU资源。

按照接收对象将消息分类,每个消息队列只有一个读者。这样即符合应用需求,也能很大程度的简化消息队列的设计,减少读数据时的竞争、为追求更高的性能保留空间。

2、高吞吐量

开源消息队列按照是否存在中间汇聚节点可分为两类:点对点的如ZeroMQ、nanomsg等,分布式的使用汇聚节点的如Kafka、ActiveMQ、RabbitMQ、NTAS等。

点对点的消息队列由于网络结构简单,吞吐量较高,ZeroMQ吞吐量在50万笔/秒以上,nanomsg在200万笔/秒以上[1]

使用中间汇聚节点的消息队列,由于结构复杂,同时保证消息的可靠以及持久化,吞吐量较低,其中,NATS在20万笔/秒,Kafka在10万笔/秒,ActiveMQ、RabbitMQ 在2-3万笔/秒[1][2]

目前郑商所撮合引擎的撮合能力在5万笔/秒左右,可预期的单机撮合极限在10-20万笔/秒。因此,吞吐量在20-30万笔每秒以上以后,消息队列已不在是限制系统性能的因素。

3、低时延

ZeroMQ在万兆网卡下,最低时延为30μs;在进程间消息通信工作模式下,最低时延为13μs[4]。Kafkas时延为2-3ms[5]。HP-UX上系统级消息队列最低时延为50μs

相较于ZeroMQ、Kafka等消息队列产品,在单机上实现的消息队列应该有更低的响应时延。

预期的目标,消息队列时延在10μs以下是比较理想的。

三、关键技术

(一)  基于共享内存的存储和通信

操作系统提供的进程间通信机制有文件、socket、消息队列、管道、共享内存等。其中,共享内存是最快的IPC机制[6]

共享内存映射到进程空间后,数据可以直接从共享内存进行读写,不需要执行系统调用进行数据的传输。因此,避免了其它进程间通信机制必须的用户态/内核态切换以及用户空间与内核空间的数据拷贝。

由于消息队不需支持跨主机通信,所以可以采用共享内存进行进程间的通信。

(二)  无锁互斥访问

消息队列需要支持多个写者,在多个写者同时进行写操作时,会产生并发操作。处理并发操作,通常的解决方案是使用互斥锁。使用互斥锁简单方便,但有一定的系统开销。在我们的测试环境下,单次锁/解锁操作耗时大概为20纳秒。多个线程互斥操作时,时间开销大大增加。同样的测试条件,当两个线程并发操作时,单次锁/解锁操作的平均时延为320纳秒。

如果并发操作能限制在一个机器字节内,可以使用CPU提供的CAS指令,即Compare& Swap原子操作,进行进程间的互斥操作。

相较于使用互斥锁进行多进程间的互斥操作,使用CAS指令的程序临界区更小,只有一个字节。当多个写者同时更新临界区时,只有一个写者成功,其它写者需要重复操作并检查结果。这样避免了进程锁起时休眠,以及锁操作带来的开销。最大程度的降低响应时延。

无锁程序的关键在于程序设计时,将程序的临界区设置为一个机器字节的变量,进而可以使用CAS指令进行原子操作。

1消息队列多写者的无锁操作

在消息队列中,数组和数组的头尾head、tail指针是多进程访问的临界区。只考虑写者,操作临界区为向数组写入数据、tail指针的更新,如图 4所示。

基于共享内存的无锁消息队列设计

4写操作临界区

减少临界区内容,在临界区只对tail指针进行修改:写者竞争获取到tail针后,再写入数据,如图 5所示。

基于共享内存的无锁消息队列设计

5减少临界区内容

现在,临界区只有tail一个变量,使用CAS指令代替互斥锁,如图 6所示。

基于共享内存的无锁消息队列设计

6使用CAS指令进行互斥操作

2、队列读操作

由于只有一个读者,因此不需考虑head指针并发操作的问题。但由于写者先更新tail指针内容,后写入数据,读者在读数据前需要先判断数据是否有效。

消息队列的消息格式如图 7所示。

基于共享内存的无锁消息队列设计

7消息数据格式

写者写入数据前,data_len为0,写入数据后,更新data_len内容。此时,读者才开始读数据。读者读取数据后,重置data_len为0;这样,下次写者写入该位置数据完成前,读者不会提前读取。

四、消息队列设计

消息队列不是作为独立的服务,而是设计成库的形式提供使用。应用程序调用消息队列库的接口,编译时将库文件链接到目标文件即可。

按照功能划分,共享内存消息队列划分为消息队列、消息队列创建、消息队列销毁、数据读取、数据写入、消息队列查询等模块,各模块对应的功能如下:

  • 消息队列

提供基础数据结构,缓存写入的数据,供其他模块读取或者查询。

  • 消息队列创建

创建指定键值、大小和容量的共享内存消息队列。

  • 消息队列销毁

销毁指定键值的共享内存消息队列,释放共享内存。

  • 数据读取

读取共享内存消息队列中的数据。

  • 数据写入

向共享内存消息队列中写入数据。

  • 消息队列查询

查询共享内存消息队列的使用情况。

(一)  模块组成

1、总体组成

共享内存消息队列的组成如图 8所示,其中消息队列由消息队列创建模块创建,供数据读写模块、消息队列查询模块使用,最终被消息队列销毁模块销毁释放。


基于共享内存的无锁消息队列设计

8消息队列组成

2、消息队列组成

消息队列是一个基于共享内存的环形队列,在开辟的一块连续的共享内存中,保存消息队列相关信息,缓存写入的数据。它在内存中的结构定义如图 9所示:

基于共享内存的无锁消息队列设计

9消息队列内存结构

消息队列头包含队列大小、消息记录大小,head、tail位置指针,用于读写同步的条件变量和锁等控制信息。

环形队列分别使用首指针head和尾指针tail标记队列的头和尾,读者从head指向的位置读取数据,写者向tail指向的位置写入数据。其结构如图 10所示:

基于共享内存的无锁消息队列设计

10环形队列结构示意图

在环形消息队列中,缓存着若干条消息记录,其中单条消息的结构如图 7所示。

(二)  消息队列模块设计

1、创建和销毁

消息队列应该在应用进程启动前创建,在应用进程退出后再销毁。创建和销毁由独立的程序调用创建和销毁模块进行操作。

创建消息队列过程主要包括:

        1. 根据参数确定消息长度、个数,计算队列缓存大小

        2. 申请共享内存

3. 初始化消息队列头部控制信息,初始化队列缓存

销毁消息队列过程主要包括:

1. 销毁消息队列头部控制信息

2. 释放共享内存其中控制信息主要包括读者、写者同步需要的条件变量和互斥锁。

2、数据写入

消息队列支持多个写者同时写操作,多个写者对队列的竞争写由CAS操作完成。写操作的过程如下:

       1. 检查环形队列空闲数量是否大于临界值

       2. 如果是,执行3;如果否,超时等待,继续执行1

       3. 获取当前tail值,new_tail为tail+1

       4. 通过CAS指令,使用new_tail更新tail

       5. 如果成功,根据new_tail位置写入数据,然后更新该位置消息的data_len为实际数据长度;如果不成功,跳到1继续执行

       6. 如果有读者在等待,通知读者

值得注意的是,写者先更新tail指针,后写入数据。这样才能保证多个写者通过CAS进行互斥操作。但是tail指针更新后,不保证数据已经更新完成。读者在读数据时需要根据data_len的值判断数据是否完整。

3、数据读取

消息队列只有一个读者,逻辑比较简单,读操作过程如下:

     1. 检查环形队列是否有数据

     2. 如果是,head自加1;如果否,超时等待,执行1

     3. 检查环形队列head位置的数据data_len是否为0

     4. 如果是,超时等待,若超过500毫秒data_len仍为0,认为写者异常,跳过该位置,执行1;如果否,读取数据

     5. 更新该位置data_len0

     6. 如果有写者在等待,通知写者

4、读写同步

读者和写者使用条件变量进行同步。条件变量保存在消息队列头部信息中。只有当写者因没有空闲位置进入等待状态时,读者读取数据后才会发送条件变量信号进行通知。同样,只有当读者因没有数据而进入等待状态时,写者写入数据后才发送条件变量信号进行通知。

五、消息队列性能

(一)  吞吐量

基于共享内存的无锁消息队列设计

11写者吞吐量

分别通过1个、2个、4个写者持续写入,1个读者读的方式,测试每秒写入和读取的消息数量。发现不同写者数量情况下,每秒写入的消息数基本相同。消息队列的吞吐量在140万笔/秒,如图11和图12所示。其中,消息长度为100字节。

12读者吞吐量

(二)  时延

1、测试方法

每个写者向消息队列中写入如下格式的数据,其中时间戳是写者在向消息队列中发送数据前获取的系统时间,精确到微秒。序号为每个写者按自己的需要,从1开始。类型是对每个写者的标志,主要用于区分不同的写者。

13写者消息结构

每个写者向消息队列中写入数据前,填好消息头,读者从消息队列中获取消息,解析出消息头中的时间戳,然后获取此时系统时间,形成读时间戳,最终计算出每条消息通过消息队列的时间。

测试过程中,根据写者数量以及写者写数据的速度,总共向消息队列中写入100万条消息,记录每条消息通过消息队列的时间,计算出每种场景下,消息队列的时延。例如:在41读,每个写者速度为每秒1000笔场景下,4个写者每个写者每秒向消息队列发送1000条消息,每个写者写250000条消息,最终4个写者向消息队列写入1000000条消息,最后计算出时延平均值。

2、测试结果

表格1时延测试结果

方式

写入速度

11读(μs

21读(μs

41读(μs

81读(μs

每秒50

6.82

9.65

7.53

7.43

每秒1000

20.09

45.40

78.33

69.42

每秒10000

333.25

635.52

1648.27

7100.91

六、总结

基于共享内存的无锁消息队列,针对郑商所五期系统的需求,使用共享内存作为底层消息传递机制,使用CAS指令进行互斥操作,实现了一个功能简单、性能突出的消息通信工具。

在实际项目中,该消息队列不仅用于前置机和撮合引擎上通信进程与应用进程的消息传递,还用作日志记录和异步落地的缓存,以及后续中间件系统开发的基础组件。

开源项目和商业产品往往要满足多种需求,提供强大的功能;这些功能可能超出了具体应用的需要。对性能特别敏感的系统,通过有针对性的设计,实现小而专的组件是追求性能的必经之路。


以上是关于基于共享内存的无锁消息队列设计的主要内容,如果未能解决你的问题,请参考以下文章

zeromq所谓的“无锁消息队列”实现

基于netty的消息队列设计

Linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存

是否存在乐观的无锁FIFO队列实现?

消息队列(MQ)消息延迟及过滤设计方案

消息队列和缓存的区别