性能基本概念转载
Posted kwanchan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了性能基本概念转载相关的知识,希望对你有一定的参考价值。
socket API
如果熟悉linux socket编程的同学阅读完了第一章, 一定有一种说不上来的别扭感觉.因为通常情况下, 当我们讨论socket的时候, 我们一般指的是操作系统提供的网络编程接口里的那个socket概念. 而在ZMQ中, 只是借用了这个概念的名字, 在ZMQ中, 我们讨论到socket的时候, 一般指代的是调用zmq_socket()
接口返回的那个socket, 具体一点: zmq socket.
zmq socket比起linux socket来说, 逻辑理解起来比较类似, 虽然两者内部完全就不是同一种东西.
- socket需要被创建, 以及关闭.
zmq_socket()
,zmq_close()
- socket有配置项.
zmq_setsockopt()
,zmq_getsockopt()
- socket有绑定和连接两种操作.
zmq_bind()
,zmq_connect()
- 收发socket上的数据.
zmq_msg_send()
,zmq_msg_recv()
,zmq_send()
,zmq_recv()
但与linux socket不同的是, zmq socket没有listen这个逻辑概念.
需要注意的是, zmq socket是void指针, 而消息则是结构实例. 这就意味着, 在C语言的API中, 需要zmq socket的地方, 传递的一定是值, 而需要传递消息的时候, 比如使用zmq_msg_send()
和zmq_msg_recv()
这样的接口, 消息参数则传递其地址. 其设计哲学是: 在zmq中, socket不归程序员掌控, 所以你可能拿到一个句柄(地址), 但不能看到它长什么样(不能看到socket实例), 但消息是程序员创建的, 是受程序员掌控的.
将socket接入网络拓扑中
在两个结点上用ZMQ实现通讯, 你需要分别为两个结点创建socket, 并在其中一个结点上调用zmq_bind()
, 在另一个结点上创建对应的zmq_connect()
. 在ZMQ中, 请不要再以死板的"客户端", "服务端"来区分网络结点. 而要这样理解: zmq_bind()
调用应该发生在网络拓扑中那些不易变的结点上, 而zmq_connect()
应该发生在网络拓扑中那些易变的结点上.
ZMQ建立起的数据连接和常见的TCP连接有一些不同, 但也有一些共通之处, 如下:
- TCP是TCP/IP协议栈的四层协议, 当建立一个TCP连接的时候, 双方都必须使用TCP/IP协议栈. 从这个角度看, ZMQ是四层之上的4.5层, ZMQ下面统一了很多连接协议, 对于TCP/IP协议栈来说, ZMQ下面有TCP, 除了TCP/IP, ZMQ还能通过共享内存在线程间建立连接, 在进程间建立连接(具体连接手段对上层是透明的), 使用TCP/IP协议栈的PGM协议(建立在IP层上的一种多播协议)建立连接.
- 在linux socket中, 一个连接就是一个socket, 但在ZMQ中, 一个socket上可以承载多个数据连接. 这里socket和connection不再是同个层次上的等价词汇, 要把socket理解为程序员访问数据连接的一个入口, 一个大门, 门推开, 可能有多个连接, 而不止一个. 有多个数据流等待吞吐.
- 上面说了, 用ZMQ在结点间建立连接, 程序员操作ZMQ相关API的时候, 实际上位于的是类似于TCP/IP里的第4.5层, 反过来看, 即具体的连接是如何建立, 如何保持, 如何维护的, 这ZMQ库的工作, 不应该由使用ZMQ库的人去关心. 也就是说, 自从使用了ZMQ库, 你再也不需要关心TCP是如何握手了. 并且, 对于合适的协议, 对端结点上线下线时, ZMQ库将负责优雅的处理接连中断, 重试连接等脏活.
- 再次重申, 如何连接, 是ZMQ库的工作, 你不应该插手. 你只需要关心数据, 套路, 拓扑.
在请求-回应套路中, 我们把比较不易变的逻辑结点称为服务端, 把易变, 也就是会经常性的退出, 或重新加入网络拓扑的结点称为客户端. 服务端向外提供服务, 必须提供一个"地址"供客户端去上门, 换句话说, 在这个套路拓扑中, 那些经常来来去去的客户端应该知道去哪找服务端. 但反过来, 服务端完全不关心去哪找客户端, 你爱来不来, 不来就滚, 不要打扰我飞升. 对于不易变的结点, 应该使用zmq_bind()
函数, 对于易变的结点, 应该采用zmq_connect
在传统的linux socket编程中, 如果服务端还没有上线工作, 这个时候去启动客户端程序, 客户端程序的connect()
调用会返回错误. 但在ZMQ中, 它妥善处理了这种情况. 客户端调用zmq_connect()
, 不会报错, 仅会导致消息被阻塞而发不出去.
不要小看这一点设计, 它反映出ZMQ的设计思想: 在请求-应答套路中, 它不光允许客户端可以随时退出, 再回来. 甚至允许服务端去上个厕所.
另外, 一个服务端可以多次调用zmq_bind()
以将自己关联到多个endpoint上.(所谓的endpoint, 就是通讯协议+通讯地址的组合, 它一般情况下指代了在这种通讯协议中的一个网络结点, 但这个结点可以是逻辑性的, 不一定只是一台机器).这就意味着, zmq socket可以同时接受来自多个不同通讯协议的多簇请求消息.
zmq_bind(socket, "tcp://*:5555");
zmq_bind(socket, "tcp://*:999");
zmq_bind(socket, "inproc://suprise_motherfucker");
但是, 对于同一种通讯协议里的同一个endpoint, 你只能对其执行一次zmq_bind()
操作. 这里有个例外, 就是ipc进程间通信. 逻辑上允许另外一个进程去使用之前一个进程已经使用过的ipc endpoint, 但不要滥用这特性: 这只是ZMQ提供给程序崩溃后恢复现场的一种手段, 在正常的代码逻辑中, 不要做这样的事情.
所以看到这里你大概能理解zmq对bind和connect这两个概念的态度: ZMQ努力的将这两个概念之间的差异抹平, 但很遗憾, zmq并没有将这两个操作抽象成一个类似于touch的操作. 但还是请谨记, 在你的网络拓扑中, 让不易变结点去使用zmq_bind()
, 让易变结点去使用zmq_connect
zmq socket是分类型的, 不同类型的socket提供了差异化的服务, socket的类型与结点在拓扑中的角色有关, 也影响着消息的出入, 以及缓存策略. 不同类型的socket之间, 有些可以互相连接, 但有些并不能, 这些规则, 以及如何在套路中为各个结点安排合适类型的socket, 都是后续我们将要讲到的内容.
如果从网络通讯的角度来讲, zmq是一个将传统传输层封装起来的网络库. 但从数据传输, 消息传输, 以及消息缓存这个角度来讲, zmq似乎又称得上是一个消息队列库. 总之, zmq是一个优秀的库, 优秀不是指它的实现, 它的性能, 而是它能解决的问题, 它的设计思路.
收发消息
在第一章里, 我们接触到了两个有关消息收发的函数, zmq_send()
和zmq_recv()
, 现在, 我们需要把术语规范一下.
zmq_send()
与zmq_recv()
是用来传输"数据"的接口. 而"消息"这个术语, 在zmq中有指定含义, 传递消息的接口是zmq_msg_send()
与zmq_msg_recv()
当我们说起"数据"的时候, 我们指的是二进制串. 当我们说"消息"的时候, 指提是zmq中的一种特定结构体.
需要额外注意的是, 无论是调用zmq_send()
还是zmq_msg_send()
, 当调用返回时, 消息并没有真正被发送出去, 更没有被对方收到. 调用返回只代表zmq将你要发送的"消息"或"数据"放进了一个叫"发送缓冲区"的地方. 这是zmq实现收发异步且带缓冲队列的一个设计.
单播传输
ZMQ底层封装了三种单播通讯协议, 分别是: 共享内存实现的线程间通讯(inproc), 进程间通信(ipc), 以及TCP/IP协议栈里的TCP协议(tcp). 另外ZMQ底层还封装了两种广播协议: PGM, EPGM. 多播我们在非常后面的章节才会介绍到, 在你了解它之前, 请不要使用多播协议, 即便你是在做一些类似于发布-订阅套路的东西.
对于多数场景来说, 底层协议选用tcp都是没什么问题的. 需要注意的是, zmq中的tcp, 被称为 "无连接的tcp协议", 而之所以起这么一个精神分裂的名字, 是因为zmq允许在对端不存在的情况下, 结点去zmq_connect()
. 你大致可以想象zmq做了多少额外工作, 但这些对于你来说, 对于上层应用程序来说, 是透明了, 你不必去关心具体实现.
IPC通讯类似于tcp, 也是"无连接"的, 目前, 这种方式不能在windows上使用, 很遗憾. 并且, 按照惯例, 在使用ipc作为通讯方式时, 我们一般给endpoint加上一个.ipc
的后缀. 另外, 在Unix操作系统上, 使用ipc连接还请格外的注意不同进程的权限问题, 特别是从属于两个不同用户的进程.
最后来说一下inproc, 也就是线程间通信, 它只能用于同一进程内的不同线程通讯. 比起tcp和ipc, 这种通讯方式快的飞起. 它与tcp和ipc最大的区别是: 在有客户端调用connect之前, 必须确保已经有一个服务端在对应的endpoint上调用了bind, 这个缺陷可能会在未来的某个版本被修正, 但就目前来讲, 请务必小心注意.
ZMQ对底层封装的通讯协议是有侵入性的
很遗憾的是, ZMQ对于其底层封装的网络协议是有侵入性的, 换句话说, 你没法使用ZMQ去实现一个HTTP服务器. HTTP作为一个五层协议, 使用TCP作为传输层协议, 对TCP里的报文格式是有规约限制的, 而ZMQ作为一个封装了TCP的4.5层协议, 其在数据交互时, 已经侵入了TCP的报文格式. 你无法让TCP里的报文既满足HTTP的格式要求, 还满足ZMQ的格式要求.
关心ZMQ到底是如何侵入它封装的通讯协议的, 这个在第三章, 当我们接触到ZMQ_ROUTER_RAW
这种socket配置项的时候才会深入讨论, 目前你只需要明白, ZMQ对其底层封装的通讯协议有侵入.
这意味着, 你无法无损的将ZMQ引入到一些现成的项目中. 这很遗憾.
I/O线程
我们先前提到过, ZMQ在后台使用独立的线程来实现异步I/O处理. 一般情况下吧, 一个I/O线程就应该足以处理当前进程的所有socket的I/O作业, 但是这个凡事总有个极限情况, 所以总会存在一些很荀的场景, 你需要多开几个I/O线程.
当你创建一个context的时候, ZMQ就在背后创建了一个I/O处理线程. 如果这么一个I/O线程不能满足你的需求, 那么就需要在创建context的时候加一些料, 让ZMQ多创建几个I/O处理线程. 一般有一个简单估算I/O线程数量的方法: 每秒你的程序有几个G字节的吞吐量, 你就开几个I/O线程.
下面是自定义I/O线程数量的方法:
int io_threads = 4;
void * context = zmq_ctx_new();
zmq_ctx_set(context, ZMQ_IO_THREADS, io_threads);
assert(zmq_ctx_get(context, ZMQ_IO_THREADS) == io_threads);
回想一下你用linux socket + epoll编写服务端应用程序的套路, 一般都是一个tcp连接专门开一个线程. ZMQ不一样, ZMQ允许你在一个进程里持有上千个连接(不一定是TCP哦), 但处理这上千个连接的I/O作业, 可能只有一个, 或者几个线程而已, 并且事实也证明这样做是可行的. 可能你的进程里只有十几个线程, 但就是能处理超过上千个连接.
当你的程序只使用inproc作为通讯手段的时候, 其实是不需要线程来处理异步I/O的, 因为inproc是通过共享内存实现通讯的. 这个时候你可以手动设置I/O线程的数量为0. 这是一个小小的优化手段, 嗯, 对性能的提升基本为0.
套路, 套路, 套路
ZMQ的设计是亲套路的, ZMQ的核心其实在于路由与缓存, 这也是为什么作为一个网络库, 它更多的被人从消息队列这个角度了解到的原因. 要用ZMQ实现套路, 关键在于使用正确的socket类型, 然后把拓扑中的socket组装配对起来. 所以, 要懂套路, 就需要懂zmq里的socket类型.
zmq提供了你构建如下套路的手段:
- 请求-应答套路. 多对多的客户端-服务端模型. 用于远程调用以及任务分发场景.
- 发布-订阅套路. 多对多的喇叭-村民模型. 用于数据分发场景.
- 流水线套路. 用于并行作业处理场景.
- 一夫一妻套路. 一对一的连接模型. 这一般用于在进程中两个线程进行通讯时使用.
我们在第一章中已经大致接触了套路, 除了一夫一妻没有接触到, 这章稍后些部分我们也将接触这种套路.要了解具体socket的各个类型都是干嘛用的, 可以去阅读zmq_socket()
的manpage, 我建议你去阅读, 并且仔细阅读, 反复阅读.下面列出的是可以互相组合的socket类型. 双方可以替换bind
与connect
操作.
- PUB SUB. 经典的发布-订阅套路
- REQ REP. 经典的请求-应答套路
- REQ ROUTER (注意, REQ发出的数据中, 以一个空帧来区分消息头与消息体)
- DEALER REP(注意, REP假定收到的数据中, 有一个空帧用以区分消息头与消息体)
- DEALER ROUTER
- DEALER DEALER
- ROUTER ROUTER
- PUSH PULL. 经典的流水线套路.
- PAIR PAIR. 一夫一妻套路
后续你还会看到有XPUB与XSUB两种类型的socket. 就目前来说, 只有上面的socket配对连接是有效的, 其它没列出的组合的行为是未定义的, 但就目前的版本来说, 错误的组合socket类型并不会导致连接时出错, 甚至可能会碰巧按你的预期运行, 但强烈不建议你这个瞎jb搞. 在未来的版本中, 组合非法的socket类型可能会导致API调用出错.
消息, 消息, 消息
libzmq有两套收发消息的API接口, 这个之前我们已经讲过. 并且在第一章里建议你多使用zmq_send()
与zmq_recv()
, 建议你规避zmq_msg_send()
与zmq_msg_recv()
. 但zmq_recv
有一个缺陷, 就是当你提供给zmq_recv()
接口的接收buffer不够长时, zmq_recv()
会把数据截断. 如果你无法预测你要收到的二进制数据的长度, 那么你只能使用zmq_msg_xxx()
接口.
从接口名上的msg
三个字母就能看出, 这个系列的接口是操纵结构体, 也就是"消息"(其实是帧, 后面会讲到), 而不是"数据", 而非缓冲区的接口, 实际上它们操纵的是zmq_msg_t
类型的结构. 这个系列的接口功能更为丰富, 但使用起来也请务必万分小心.
- 初始化消息相关的接口:
zmq_msg_init()
,zmq_msg_init_size()
,zmq_msg_init_data()
- 消息收发接口:
zmq_msg_send()
,zmq_msg_recv()
- 消息释放接口:
zmq_close()
- 访问消息内容的接口:
zmq_msg_data()
,zmq_msg_size()
,zmq_msg_more()
- 访问消息配置项的接口:
zmq_msg_get()
,zmq_msg_set()
- 复制拷贝操作接口:
zmq_msg_copy()
,zmq_msg_move()
消息结构中封装的数据是二进制的, 依然由程序员自己解释. 关于zmq_msg_t
结构类型, 下面是你需要知道的基础知识:
- 去阅读上面的消息相关接口API的manpage, 你会发现传递参数都是以
zmq_msg_t *
. 也就是说这是一个内部实现不对外开放的类型, 创建, 传递, 都应当以指针类型进行操作. - 要从socket中接收一个消息, 你需要先通过
zmq_msg_init()
创建一个消息对象, 然后将这个消息对象传递给zmq_msg_recv()
接口 - 要向socket中写入消息, 你需要先通过
zmq_msg_init_size()
创建一个数据容量指定的消息对象, 然后把你要写入的二进制数据通过内存拷贝函数, 比如memcpy()
写入消息中, 最后调用zmq_msg_send()
, 看到这里你应该明白,zmq_msg_init_size()
接口内部进行了内存分配. - 消息的"释放"和"销毁"是两个不同的概念.
zmq_msg_t
其实是引用计数方式实现的共享对象类型, "释放"是指当前上下文放弃了对该消息的引用, 内部导致了实例的引用计数-1, 而"销毁"则是彻底把实例本身给free掉了. 当你"释放"一个消息的时候, 应当调用zmq_msg_close()
接口. 如果消息实例在释放后引用计数归0, 那么这个消息实例会被ZMQ自动销毁掉. - 要访问消息里包装的数据, 调用
zmq_msg_data()
接口, 要获取消息中数据的长度, 调用zmq_msg_size()
- 在你熟读并理解相关manpage中的内容之前, 不要去调用
zmq_msg_move()
,zmq_msg_copy()
,zmq_msg_init_data()
这三个接口 - 当你通过
zmq_msg_send()
调用将消息发送给socket后, 这个消息内部包装的数据会被清零, 也就是zmq_msg_size() == 0
, 所以, 你不应该连续两次使用同一个zmq_msg_t *
值调用zmq_msg_send()
. 但需要注意的是, 这里的"清零", 并不代表消息被"释放", 也不代表消息被"销毁". 消息还是消息, 只是其中的数据被扔掉了.
如果你想把同一段二进制数据发送多次, 正确的做法是下面这样:
- 调用
zmq_msg_init_size()
, 创建第一个消息, 再通过memcpy
或类似函数将二进制数据写入消息中 - 调用
zmq_msg_init()
创建第二个消息, 再调用zmq_msg_copy()
从第一个消息将数据"复制"过来 - 重复上述步骤
- 依次调用
zmq_msg_send()
发送上面的多个消息
ZMQ还支持所谓的"多帧消息", 这种消息允许你把多段二进制数据一次性发送给对端. 这个特性在第三章我们再讲. (P.S.: 这是一个很重要的特性, 路由代理等高级套路就严重依赖这种多帧消息.). ZMQ中的消息有三层逻辑概念: 消息, 帧, 二进制数据. 用户自定义的二进制数据被包装成帧, 然后一个或多个帧组成一个消息. 消息是ZMQ拓扑网络中两个结点收发的单位, 但在ZMQ底层的传输协议中, 最小单位是帧.
换一个角度来讲, ZMQ使用其底层的传输协议, 比如tcp, 比如inproc, 比如ipc来传输数据, 当ZMQ调用这些传输协议传递数据的时候, 最小单元是帧. 帧的完整性由传输协议来保证, 即是ZMQ本身不关心这个帧会不会破损, 帧的完整传输应当由这些传输协议去保证. 而在使用ZMQ构建应用程序的程序员眼中, 最小的传输单位是消息, 一个消息里可能会有多个帧, 程序员不去关心消息从一端到另一端是否会出现丢帧, 消息的完整性与原子性应当由ZMQ库去保证.
前面我们讲过, ZMQ对其底层的传输协议是有侵入性的. 如果要了解ZMQ到底是如何在传输协议的基础上规定帧传输格式的, 可以去阅读这个规范.
在我们到达第三章之前, 我们所讨论的消息中都仅包含一个帧. 这就是为什么在这一小节的描述中, 我们几乎有引导性的让你觉得, zmq_msg_t
类型, 就是"消息", 其实不是, 其实zmq_msg_t
消息只是"帧".
- 一个消息可以由多个帧组成
- 每个帧都是一个
zmq_msg_t
对象 - 使用
zmq_msg_send()
,zmq_msg_recv()
, 你可以一帧一帧的发送数据. 可以用多次调用这些接口的方式来发送一个完整的消息, 或者接收一个完整的消息: 在发送时传入ZMQ_SNDMORE
参数, 或在接收时, 通过zmq_getsockopt()
来获取ZMQ_RCVMORE
选项的值. 更多关于如何使用低级API收发多帧消息的信息, 请参见相关接口的manpage - ZMQ也提供了便于收发多帧消息的高级API
关于消息或帧, 还有下面的一些特性:
- ZMQ允许你发送数据长度为0的帧. 比如在有些场合, 这只是一个信号, 而没有任何语义上的数据需要被携带
- ZMQ在发送多帧消息时, 保证消息的原子性与完整性. 如果丢失, 所有帧都不会到达对端, 如果成功, 那么必须所有帧都被正确送达, 帧在传输过程中不会出现破损.
- 在调用发送数据的癌后, 消息并不会被立即发出, 而是被放在发送缓冲区中. 这和
zmq_send()
是一致的. - 你必须在完成消息接收后, 调用
zmq_msg_close()
接口来释放这个zmq_msg_t
对象
最后再强调一下, 在你不理解zmq_msg_t
的原理之前, 不要使用zmq_msg_init_data()
接口, 这是一个0拷贝接口, 如果不熟悉zmq_msg_t
结构的原理, 瞎jb用, 是会core dump的
ZMQ中的多路I/O复用
在先前的所有例子程序中, 大多程序里干的都是这样的事情
- 等待socket上有数据
- 接收数据, 处理
- 重复上面的过程
如果你接触过linux中的select, pselect, epoll等多路IO复用接口, 你一定会好奇, 在使用zmq的时候, 如何实现类似的效果呢? 毕竟ZMQ不光把linux socket的细节给你封装了, 连文件描述符都给你屏蔽封装掉了, 显然你没法直接调用类似于select, pselect, epoll这种接口了.
答案是, ZMQ自己搞了一个类似的玩意, zmq_poll()
了解一下.
我们先看一下, 如果没有多路IO接口, 如果我们要从两个socket上接收数据, 我们会怎样做. 下面是一个没什么卵用的示例程序, 它试图从两个socket上读取数据, 使用了异步I/O. (如果你有印象的话, 应该记得对应的两个endpoint实际上是我们在第一章写的两个示例程序的数据生产方: 天气预报程序与村口的大喇叭)
#include <zmq.h>
#include <stdio.h>
int main(void)
{
void * context = zmq_ctx_new();
void * receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:5557");
void * subscriber = zmq_socket(context, ZMQ_SUB);
zmq_connect(subscriber, "tcp://localhost:5556");
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
while(1)
{
char msg[256];
while(1)
{
int size = zmq_recv(receiver, msg, 255, ZMQ_DONTWAIT);
if(size != -1)
{
// 接收数据成功
}
else
{
break;
}
}
while(1)
{
int size = zmq_recv(subscriber, msg, 255, ZMQ_DONTWAIT);
if(size == -1)
{
// 接收数据成功
}
else
{
break;
}
}
sleep(1); // 休息一下, 避免疯狂循环
}
zmq_close(receiver);
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}
在没有多路IO手段之前, 这基本上就是你能做到的最好情形了. 大循环里的sleep()
让人浑身难受. 不加sleep()
吧, 在没有数据的时候, 这个无限空循环能把一个核心的cpu占满. 加上sleep()
吧, 收包又会有最坏情况下1秒的延时.
但有了zmq_poll()
接口就不一样了, 代码就会变成这样:
#include <zmq.h>
#include <stdio.h>
int main(void)
{
void * context = zmq_ctx_new();
void * receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:5557");
void * subscriber = zmq_socket(context, ZMQ_SUB);
zmq_connect(subscriber, "tcp://localhost:5556");
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
while(1)
{
char msg[256];
zmq_pollitem_t items[] = {
{receiver, 0, ZMQ_POLLIN, 0},
{subscriber,0, ZMQ_POLLIN, 0},
};
zmq_poll(items, 2, -1);
if(items[0].revents & ZMQ_POLLIN)
{
int size = zmq_recv(receiver, msg, 255, 0);
if(size != -1)
{
// 接收消息成功
}
}
if(items[1].revents & ZMQ_POLLIN)
{
int size = zmq_recv(subscriber, msg, 255, 0);
if(size != -1)
{
// 接收消息成功
}
}
}
zmq_close(receiver);
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}
zmq_pollitem_t
类型定义如下, 这个定义可以从zmq_poll()
的manpage里查到
typedef struct{
void * socket; // ZMQ的socket
int fd; // 是的, zmq_poll()还可以用来读写linux file descriptor
short events; // 要被监听的事件, 基础事件有 ZMQ_POLLIN 和 ZMQ_POLLOUT, 分别是可读可写
short revents; // 从zmq_poll()调用返回后, 这里存储着触发返回的事件
} zmq_pollitem_t;
多帧消息的收发
我们之前提到过, 用户数据被包装成zmq_msg_t
对象, 也就是帧, 而在帧上, 还有一个逻辑概念叫"消息". 那么在具体编码中, 如何发送多帧消息呢? 而又如何接收多帧消息呢? 简单的讲, 两点:
- 在发送时, 向
zmq_msg_send()
传入ZMQ_SNDMORE
选项, 告诉发送接口, "我后面还有其它帧" - 在接收消息时, 每调用一次
zmq_msg_recv()
接收一个帧, 就调用一次zmq_msg_more()
或者zmq_getsockopt() + ZMQ_RCVMORE
来判断是否这是消息的最后一个帧
发送示例:
zmq_msg_send(&msg, socket, ZMQ_SNDMORE);
zmq_msg_send(&msg, socket, ZMQ_SNDMORE);
zmq_msg_send(&msg, socket, 0); // 消息的最后一个帧
接收示例:
while(1)
{
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_msg_recv(&msg, socket, 0);
// 做处理
zmq_msg_close(&msg);
if(!zmq_msg_more(&msg)) // 注意, zmq_msg_more可以在zmq_msg_close后被安全的调用
{
break;
}
}
这里有一个需要注意的有趣小细节: 要判断一个收来的帧是不是消息的最后一个帧, 有两种途径, 一种是zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size)
, 另外一种是zmq_msg_more(&msg)
. 前一种途径的入参是socket, 后一种途径的入参是msg. 这真是很因缺思汀. 目前来说, 两种方法都可以, 不过我建议你使用zmq_getsockopt()
, 至于原因嘛, 因为在zmq_msg_recv()
的manpage中, 是这样建议的.
关于多帧消息, 你需要注意以下几点:
- 多帧消息的传输是原子性的, 这是由ZMQ保证的
- 原子性也意味着, 当你使用
zmq_poll()
时, 当socket可读, 并且用zmq_msg_recv()
读出一个帧时, 代表着不用等待下一次循环, 你直接继续读取, 一定能读取能整个消息中剩余的其它所有帧 - 当一个多帧消息开始被接收时, 无论你是否通过
zmq_msg_more()
或zmq_getsockopt() + ZMQ_RCVMORE
检查消息是否接收完整, 你一帧帧的收, 也会把整个消息里的所有帧收集齐. 所以从这个角度看,zmq_msg_more()
可以在把所有可读的帧从socket里统一接收到手之后, 再慢慢判断这些帧应该怎么拼装. 所以这样看, 它和zmq_getsockopt()
的功能也不算是完全重复. - 当一个多帧消息正在发送时, 除了把socket关掉(暴力的), 否则你不能取消本次发送, 本次发送将持续至所有帧都被发出.
中介与代理
ZMQ的目标是建立去中心化的消息通信网络拓扑. 但不要误解"去中心"这三个字, 这并不意味着你的网络拓扑在中心圈内空无一物. 实际上, 用ZMQ搭建的网络拓扑中常常充满了各种非业务处理的网络结点, 我们把这些感知消息, 传递消息, 分发消息, 但不实际处理消息的结点称为"中介", 在ZMQ构建的网络中, 它们按应用场景有多个细化的名字, 比如"代理", "中继", "装置", "掮客"等.
这套逻辑在现实世界里也很常见, 中间人, 中介公司, 它们不实际生产社会价值, 表面上看它们的存在是在吸两头的血, 这些皮条客在社会中的存在意义在于: 它们减少了沟通的复杂度, 对通信双方进行了封装, 提高了社会运行效率.
在发布-订阅套路中加入中介: XPUB与XSUB
当构建一个稍有规模的颁式系统的时候, 一个避不开的问题就是, 网络中的结点是如何感知其它结点的存在的? 结点会当机, 会扩容, 在这些变化发生的时候, 网络中的其它正在工作的结点如何感知这些变化, 并保持系统整体正常运行呢? 这就是经典的"动态探索问题".
动态探索问题有一系列很经典的解决方案, 最简单的解决方案就是把问题本身解决掉: 把网络拓扑设计死, 代码都写死, 别让它瞎jb来回变, 问题消灭了, done!. 这种解决方案的缺点就是如果网络拓扑要有变更, 比如业务规模扩展了, 或者有个结点当机了, 网络配置管理员会骂娘.
拓扑规模小的时候, 消灭问题的思路没什么坏处, 但拓扑稍微复杂一点, 显然这就是一个很可笑的解决方案.比如说, 网络中有一个发布者, 有100多个订阅者, 发布者bind到endpoint上, 订阅者connect到endpoint上. 如果代码是写死的, 如果发布者本身出了点什么问题, 或者发布者一台机器搞不住了, 需要横向扩容, 你就得改代码, 然后手动部署到100多台订阅者上. 这样的运维成本太大了.
这种场景, 你就需要一个"中介", 对发布者而言, 它从此无需关心订阅者是谁, 在哪, 有多少人, 只需要把消息给中介就行了. 对于订阅者而言, 它从此无需关注发布者有几个, 是否使用了多个endpoint, 在哪, 有多少人. 只需要向中介索取消息就行了. 虽然这时发布者身上的问题转嫁到的中介身上: 即中介是网络中最易碎的结点, 如果中介挂了整个拓扑就挂了, 但由于中介不处理业务逻辑, 只是一个类似于交换机的存在, 所以同样的机器性能, 中介在单位时间能转发的消息数量, 比发布者和订阅者能处理的消息高一个甚至几个数量级. 是的, 使用中介引入了新的问题, 但解决了老的问题.
中介并没有解决所有问题, 当你引入中介的时候, 中介又变成了网络中最易碎的点, 所以在实际应用中, 要控制中介的权重, 避免整个网络拓扑严重依赖于一个中介这种情况出现: ZMQ提倡去中心化, 不要把中介变成一个垄断市场的掮客.
对于发布者而言, 中介就是订阅者, 而对于订阅者而言, 中介就是发布者. 中介使用两种额外的socket类型: XPUB与XSUB. XSUB与真实的发布者连接, XPUB与真实的订阅者连接.
在请求-回应套路中加入掮客: DELEAR与ROUTER
在我们之前写的请求-回应套路程序中, 我们有一个客户端, 一个服务端. 这是一个十分简化的例子, 实际应用场景中的请求-回应套路中, 一般会有多个客户端与多个服务端.
请求-应答模式有一个隐含的条件: 服务端是无状态的. 否则就不能称之为"请求-应答"套路, 而应该称之为"唠嗑套路".
要连接多个客户端与多个服务端, 有两种思路.
第一种暴力思路就是: 让N个客户端与M个服务端建立起N*M的全连接. 这确实是一个办法, 虽然不是很优雅. 在ZMQ中, 实现起来还轻松不少: 因为ZMQ的socket可以向多个endpoint发起连接, 这对于客户端来说, 编码难度降低了. 客户端应用程序中可以创建一个zmq_socket, 然后connect到多个服务端的endpoint上就行了. 这种思路做的话, 客户端数量扩张很容易, 直接部署就可以, 代码不用改. 但是缺陷有两个:
- 服务端扩容时, 所有客户端的代码都得跟着改
- 客户端代码里必须知道所有服务端的endpoint
总的来说, 这是一种很暴力的解决办法, 不适合用于健壮的生产环境. 但是这确实是一个办法.
为了解决上面两个缺陷, 自然而然的我们就会想到: 为什么不能把服务端抽象出来呢? 让一个掮客来做那个唯一的endpoint, 以供所有客户端connect, 然后掮客在背后再把请求体分发给各个服务端, 服务端做出回应后掮客再代替服务端把回应返回给客户端, 这样就解决了上面的问题:
- 对于客户端来说, 服务端抽象成了一个endpoint, 服务端扩容时, 客户端是没有感知的.
- 客户端不需要知道服务端的所有endpoint, 只需要知道掮客的endpoint就可以了.
并且, 掮客还可以做到以下
- 如果N个客户端发送请求的速度时快时慢, 快的时候, M个服务端处理不过来. 掮客可以做一个缓冲地带.
- 掮客可以记录会话状态, 可以保证某一个特定的客户端始终与一个固定的服务端进行数据交互. 某种程度上, 掮客与客户端分别记录部分会话信息, 服务端可以在无状态的情况下实现"唠嗑套路"
所以, 在请求回应套路中加入掮客, 是一个很明智的选择, 这就是第二种思路, 这种思路不是没有缺陷, 有, 而且很明显: 掮客是整个系统中最脆弱的部分.
但这个缺陷可以在一定程度上克服掉:
- 如果单机掮客转发能力不够, 那么可以搞多个掮客. 比如N个客户端,M个服务端, 3个掮客. 客户端与3个掮客建立全连接, 3个掮客与M个服务端建立全连接. 总是要好过N个客户端与M个服务端建立全连接的.
- 如果单机掮客缓冲能力不够, 甚至可以加多层掮客. 这种使用方法就把掮客的缓冲特性放在了首位.
ZMQ中, 有两个特殊的socket类型特别适合掮客使用:
- ROUTER 用于掮客与多个客户端连接. 掮客bind, 客户端connect.
- DEALER 用于掮客和多个服务端连接. 掮客bind, 服务端connect.
关于这两种特殊的socket的特性, 后续我们会仔细深入, 目前来说, 你只需要了解
- 它们实现了消息的缓冲
- 它们通过一种特殊的机制记录了会话
多说无益, 来看代码. 下面是在客户端与服务端中插入掮客的代码实例:
客户端
#include <zmq.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REQ);
zmq_connect(socket, "tcp://localhost:5559");
for(int i = 0; i < 10; ++i)
{
s_send(socket, "Hello");
char * strRsp = s_recv(socket);
printf("Received reply %d [%s]\n", i, strRsp);
free(strRsp);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
服务端
#include <zmq.h>
#include <unistd.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_REP);
zmq_connect(socket, "tcp://localhost:5560");
while(1)
{
char * strReq = s_recv(socket);
printf("Received request: [%s]\n", strReq);
free(strReq);
sleep(1);
s_send(socket, "World");
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
掮客
#include <zmq.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket_for_client = zmq_socket(context, ZMQ_ROUTER);
void * socket_for_server = zmq_socket(context, ZMQ_DEALER);
zmq_bind(socket_for_client, "tcp://*:5559");
zmq_bind(socket_for_server, "tcp://*:5560");
zmq_pollitem_t items[] = {
{ socket_for_client, 0, ZMQ_POLLIN, 0 },
{ socket_for_server, 0, ZMQ_POLLIN, 0 },
};
while(1)
{
zmq_msg_t message;
zmq_poll(items, 2, -1);
if(items[0].revents & ZMQ_POLLIN)
{
while(1)
{
zmq_msg_init(&message);
zmq_msg_recv(&message, socket_for_client, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, socket_for_server, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if(!more)
{
break;
}
}
}
if(items[1].revents & ZMQ_POLLIN)
{
while(1)
{
zmq_msg_init(&message);
zmq_msg_recv(&message, socket_for_server, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, socket_for_client, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if(!more)
{
break;
}
}
}
}
zmq_close(socket_for_client);
zmq_close(socket_for_server);
zmq_ctx_destroy(context);
return 0;
}
客户端和服务端由于掮客的存在, 代码都简单了不少, 对于掮客的代码, 有以下几点需要思考:
- 为什么客户端和服务端双方在代码中以
s_send
与s_recv
互相传递字符串, 但在掮客那里就需要用zmq_msg_t
进行转发呢? - 为什么掮客在转发消息的时候, 还需要判断是否是多帧消息呢?
- 更进一步的, 如果有多个客户端与多个服务端, 客户端A向掮客发送请求, 掮客将其转发到了服务端B, 然后B回包, 发向掮客, 当回包消息到达掮客时, 掮客是如何将回包消息正确投递给A, 而不是其它客户端的呢?
上面三点其实是同一个问题: 掮客是如何实现带会话追踪的转发消息的?
另外, 如果你先启动掮客, 再启动客户端, 再启动服务端. 你会看到在服务端正确启动后, 客户端显示它收到了回包.那么:
- 在服务端未启动时, 显然在客户端的角度来讲, 客户端已经将第一个请求投递给了掮客. 如果此时有1000个客户端与掮客相连, 1000个首请求消息是如何存储的? 10000个呢? 什么时候掮客会丢弃请求?
这就是有关掮客的第二个问题: 如何配置缓冲区.
本章目前暂时不会对这三个问题做出解答, 大家先思考一下. 我们将在下一章深入掮客的细节进行进一步探索.
ZMQ内置的掮客函数
在上面的掮客代码示例中, 核心代码就是zmq_poll
对两个socket的监听, 以及while(1)
循环. ZMQ将这两坨操作统一封装到了一个函数中, 省得大家每次都要写boring code.
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
参数frontend
与backend
分别是与客户端相连的socket
和与服务端相连的socket
. 在使用zmq_proxy
函数之前, 这两个socket必须被正确配置好, 该调用connect就调用connect, 该调用bind就调用bind. 简单来讲, zmq_proxy
负责把frontend
与backend
之间的数据互相递送给对方. 而如果仅仅是单纯的递送的话, 第三个参数capture
就应当被置为NULL
, 而如果还想监听一下数据, 那么就再创建一个socket, 并将其值传递给capture
, 这样, frontend
与backend
之间的数据都会有一份拷贝被送到capture
上的socket.
当我们用zmq_proxy
重写上面的掮客代码的话, 代码会非常简洁, 会变成这样:
#include <zmq.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
void * socket_for_client = zmq_socket(context, ZMQ_ROUTER);
void * socket_for_server = zmq_socket(context, ZMQ_DEALER);
zmq_bind(socket_for_client, "tcp://*:5559");
zmq_bind(socket_for_server, "tcp://*:5560");
zmq_proxy(socket_for_client, socket_for_server, NULL);
zmq_close(socket_for_client);
zmq_close(socket_for_server);
zmq_ctx_destroy(context);
return 0;
}
桥接技巧
桥接是服务器后端的一种常用技巧. 所谓的桥接有点类似于掮客, 但是解决问题的侧重点不一样. 掮客主要解决了三个问题:
- 降低网络连接数量. 从N*M降低到 (N+M)*X
- 向客户端与服务端屏蔽彼此的具体实现, 隐藏彼此的具体细节.
- 缓冲
而桥接解决的问题的侧重点主要在:
- 向通信的一方, 屏蔽另一方的具体实现.
这种设计思路常用于后台服务的接口层. 接口层一方面连接着后端内部局域网, 另外一方面对公提供服务. 这种服务可以是请求-回应式的服务, 也可以是发布-订阅式的服务(显然发布方在后端内部的局域网里). 这个时候接口层其实就完成了桥接的工作.
其实这种应用场景里, 把这种技巧称为桥接
并不是很合适. 因为桥接
是一个计算机网络中硬件层的术语, 最初是用于线缆过长信号衰减时, 在线缆末端再加一个信号放大器之类的设备, 为通信续命用的.
原版ZMQ文档在这里提出bridging
这个术语, 也只是为了说明一下, zmq_proxy
的适用场景不仅局限于做掮客, 而是应该在理解上更宽泛一点, zmq_proxy
函数就是互相传递两个socket之间数据函数, 仅此而已, 而具体这个函数能应用在什么样的场景下, 掮客与桥接场景均可以使用, 但绝不局限于此. 写代码思维要活.
妥善的处理错误
ZMQ库对待错误, 或者叫异常, 的设计哲学是: 见光死. 前文中写的多数示例代码, 都没有认真的检查ZMQ库函数调用的返回值, 也没有关心它们执行失败后会发生什么. 一般情况下, 这些函数都能正常工作, 但凡事总有个万一, 万一创建socket失败了, 万一bind或connect调用失败了, 会发生什么?
按照见光死的字面意思: 按我们上面写代码的风格, 一旦出错, 程序就挂掉退出了.
所以正确使用ZMQ库的姿势是: 生产环境运行的代码, 务必为每一个ZMQ库函数的调用检查返回值, 考虑调用失败的情况. ZMQ库函数的设计也继续了POSIX接口风格里的一些设计, 这些设计包括:
- 创建对象的接口, 在失败时一般返回NULL
- 处理数据的接口, 正常情况下将返回处理的数据的字节数. 失败情况下将返回-1
- 其它一般性的函数, 成功时返回0, 失败时返回-1
- 当调用失败发生时, 具体的错误码存放在
errno
中, 或zmq_errno()
中 - 有关错误的详情描述信息, 通过
zmq_strerror()
可能获得
真正健壮的代码, 应该像下面这样写, 是的, 它很啰嗦, 但它很健壮:
// ...
void * context = zmq_ctx_new();
assert(context);
void * socket = zmq_socket(context, ZMQ_REP);
assert(socket);
int rc = zmq_bind(socket, "tcp://*:5555");
if(rc == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
// ...
有两个比较例外的情况需要你注意一下:
- 处理
ZMQ_DONTWAIT
的函数返回-1时, 一般情况下不是一个致命错误, 不应当导致程序退出. 比如在收包函数里带上这个标志, 那么语义只是说"没数据可收", 是的, 收包函数会返回-1, 并且会置error
值为EAGAIN
, 但这并不代表程序发生了不可逆转的错误. - 当一个线程调用
zmq_ctx_destroy()
时, 如果此时有其它线程在忙, 比如在写数据或者收数据什么的, 那么这会直接导致这些在干活的线程, 调用的这些阻塞式接口函数返回-1, 并且errno
被置为ETERM
. 这种情况在实际编码过程中不应当出现.
下面我们写一个健壮的分治套路, 和我们在第一章中写过的类似, 不同的是, 这次, 在监理收到"所有工作均完成"的消息之后, 会发消息给各个工程队, 让工程队停止运行. 这个例子主要有两个目的:
- 向大家展示, 在使用ZMQ库的同时, 把代码写健壮
- 向大家展示如何优雅的干掉一个进程
原先的分治套路代码, 使用PUSH/PULL这两种socket类型, 将任务分发给多个工程队. 但在工作做完之后, 工程队的程序还在运行, 工程队的程序无法得知任务什么进修终止. 这里我们再掺入发布-订阅套路, 在工作做完之后, 监理向广大工程队, 通过PUB类型的socket发送"活干活了"的消息, 而工程队用SUB类型的socket一旦收到监理的消息, 就停止运行.
包工头ventilator的代码和上一章的一毛一样, 只是对所有的ZMQ库函数调用增加了错误处理. 照顾大家, 这里再帖一遍
#include <zmq.h>
#include <stdio.h>
#include <time.h>
#include <assert.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
assert(context);
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
assert(socket_to_sink);
void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
assert(socket_to_worker);
if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
if(zmq_bind(socket_to_worker, "tcp://*:5557") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
printf("Press Enter when all workers get ready:");
getchar();
printf("Sending tasks to workers...\n");
if(s_send(socket_to_sink, "Get ur ass up") == -1)
{
printf("E: s_send failed: %s\n", strerror(errno));
return -1;
}
srandom((unsigned)time(NULL));
int total_ms = 0;
for(int i = 0; i < 100; ++i)
{
int workload = randof(100) + 1;
total_ms += workload;
char string[10];
snprintf(string, sizeof(string), "%d", workload);
if(s_send(socket_to_worker, string) == -1)
{
printf("E: s_send failed: %s\n", strerror(errno));
return -1;
}
}
printf("Total expected cost: %d ms\n", total_ms);
zmq_close(socket_to_sink);
zmq_close(socket_to_worker);
zmq_ctx_destroy(context);
return 0;
}
接下来是工程队worker的代码, 这一版新增了一个socket_to_sink_of_control
来接收来自监理的停止消息:
#include <zmq.h>
#include <assert.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
assert(context);
void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
assert(socket_to_ventilator);
if(zmq_connect(socket_to_ventilator, "tcp://localhost:5557") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
assert(socket_to_sink);
if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
void * socket_to_sink_of_control = zmq_socket(context, ZMQ_SUB);
assert(socket_to_sink_of_control);
if(zmq_connect(socket_to_sink_of_control, "tcp://localhost:5559") == -1)
{
printf("E: connect failed: %s\n", strerror(errno));
return -1;
}
if(zmq_setsockopt(socket_to_sink_of_control, ZMQ_SUBSCRIBE, "", 0) == -1)
{
printf("E: setsockopt failed: %s\n", strerror(errno));
}
zmq_pollitem_t items [] = {
{ socket_to_ventilator, 0, ZMQ_POLLIN, 0 },
{ socket_to_sink_of_control, 0, ZMQ_POLLIN, 0 },
};
while(1)
{
if(zmq_poll(items, 2, -1) == -1)
{
printf("E: poll failed: %s\n", strerror(errno));
return -1;
}
if(items[0].revents & ZMQ_POLLIN)
{
char * strWork = s_recv(socket_to_ventilator);
assert(strWork);
printf("%s.", strWork);
fflush(stdout);
s_sleep(atoi(strWork));
free(strWork);
if(s_send(socket_to_sink, "") == -1)
{
printf("E: s_send failed %s\n", strerror(errno));
return -1;
}
}
if(items[1].revents & ZMQ_POLLIN)
{
break;
}
}
zmq_close(socket_to_ventilator);
zmq_close(socket_to_sink);
zmq_close(socket_to_sink_of_control);
zmq_ctx_destroy(context);
return 0;
}
接下来是监理的代码, 这一版新增了socket_to_worker_of_control
来在任务结束之后给工程队发布停止消息:
#include <zmq.h>
#include <assert.h>
#include <stdint.h>
#include "zmq_helper.h"
int main(void)
{
void * context = zmq_ctx_new();
assert(context);
void * socket_to_worker = zmq_socket(context, ZMQ_PULL);
if(zmq_bind(socket_to_worker, "tcp://*:5558") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
void * socket_to_worker_of_control = zmq_socket(context, ZMQ_PUB);
if(zmq_bind(socket_to_worker_of_control, "tcp://*:5559") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -1;
}
char * strBeginMsg = s_recv(socket_to_worker);
assert(strBeginMsg);
free(strBeginMsg);
int64_t i64StartTime = s_clock();
for(int i = 0; i < 100; ++i)
{
char * strRes = s_recv(socket_to_worker);
assert(strRes);
free(strRes);
if(i % 10 == 0)
{
printf(":");
}
else
{
printf(".");
}
fflush(stdout);
}
printf("Total elapsed time: %d msec\n", (int)(s_clock() - i64StartTime));
if(s_send(socket_to_worker_of_control, "STOP") == -1)
{
printf("E: s_send failed: %s\n", strerror(errno));
return -1;
}
zmq_close(socket_to_worker);
zmq_close(socket_to_worker_of_control);
zmq_ctx_destroy(context);
return 0;
}
这个例子也展示了如何将多种套路揉合在一个场景中. 所以说写代码, 思维要灵活.
处理POSIX Signal
一般情况下, Linux上的程序在接收到诸如SIGINT
和SIGTERM
这样的信号时, 其默认动作是让进程退出. 这种退出信号的默认行为, 只是简单的把进程干掉, 不会管什么缓冲区有没有正确刷新, 也不会管文件以及其它资源句柄是不是正确被释放了.
这对于实际应用场景中的程序来说是不可接受的, 所以在编写后台应用的时候一定要注意这一点: 要妥善的处理POSIX Signal. 限于篇幅, 这里不会对Signal进行进一步讨论, 如果对这部分内容不是很熟悉的话, 请参阅<Unix环境高级编程>(<Advanced Programming in the UNIX Environment>)第十章(chapter 10. Signals).
下面是妥善处理Signal的一个例子
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>
#define S_NOTIFY_MSG " "
#define S_ERROR_MSG "Error while writing to self-pipe.\n"
static int s_fd;
static void s_signal_handler(int signal_value)
{
int rc = write(s_fd, S_NOTIFY_MSG, sizeof(S_NOTIFY_MSG));
if(rc != sizeof(S_NOTIFY_MSG))
{
write(STDOUT_FILENO, S_ERROR_MSG, sizeof(S_ERROR_MSG) - 1);
exit(1);
}
}
static void s_catch_signals(int fd)
{
s_fd = fd;
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(void)
{
int rc;
void * context = zmq_ctx_new();
assert(context);
void * socket = zmq_socket(context, ZMQ_REP);
assert(socket);
if(zmq_bind(socket, "tcp://*:5555") == -1)
{
printf("E: bind failed: %s\n", strerror(errno));
return -__LINE__;
}
int pipefds[2];
rc = pipe(pipefds);
if(rc != 0)
{
printf("E: creating self-pipe failed: %s\n", strerror(errno));
return -__LINE__;
}
for(int i = 0; i < 2; ++i)
{
int flags = fcntl(pipefds[0], F_GETFL, 0);
if(flags < 0)
{
printf("E: fcntl(F_GETFL) failed: %s\n", strerror(errno));
return -__LINE__;
}
rc = fcntl(pipefds[0], F_SETFL, flags | O_NONBLOCK);
if(rc != 0)
{
printf("E: fcntl(F_SETFL) failed: %s\n", strerror(errno));
return -__LINE__;
}
}
s_catch_signals(pipefds[1]);
zmq_pollitem_t items[] = {
{ 0, pipefds[0], ZMQ_POLLIN, 0 },
{ socket, 0, ZMQ_POLLIN, 0 },
};
while(1)
{
rc = zmq_poll(items, 2, -1);
if(rc == 0)
{
continue;
}
else if(rc < 0)
{
if(errno == EINTR)
{
continue;
}
else
{
printf("E: zmq_poll failed: %s\n", strerror(errno));
return -__LINE__;
}
}
// Signal pipe FD
if(items[0].revents & ZMQ_POLLIN)
{
char buffer[2];
read(pipefds[0], buffer, 2); // clear notifying bytes
printf("W: interrupt received, killing server...\n");
break;
}
// Read socket
if(items[1].revents & ZMQ_POLLIN)
{
char buffer[255];
rc = zmq_recv(socket, buffer, 255, ZMQ_NOBLOCK);
if(rc <