libevent & Reactor模式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了libevent & Reactor模式相关的知识,希望对你有一定的参考价值。
参考技术A 整个libevent本身就是一个Reactor模式;Reactor是一种事件驱动机制,应用程序提供相应的接口并且注册到Reactor中,如果相应的事件发生,Reactor将主动调用应用程序的接口。Reactor框架需要包含几个组件:事件源,Reactor框架,IO多路复用机制,事件处理程序。
事件源:linux上是文件描述符,
IO多路复用:epoll,select等;程序将关心的fd以及事件(read,write)注册到IO多路复用上(epoll),事件到达后,epoll发出通知。
Reactor反应器:libevent中就是指event_base结构体。
事件处理程序:对应libevent中,就是event结构体。
libevent初始化Reactor:
struct event_base* reactor = event_base_new();
函数实现内容是 对结构体struct event_base的初始化操作:malloc内存,选择适当的IO多路复用(此处以epoll为例),调用IO多路复用的初始化操作,epoll的初始化操作(epoll_init),调用epoll_create创建一个RB树头节点。
事件处理程序:event
使用event_new 或者event_assign函数 获得一个event,二者的区别是event_new 内部对结构体event进行申请空间后再调用event_assign操作。
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
函数event_assign完成的功能是对event结构体的初始化工作
以上对base_event和event 进行了初始化相关操作,二者还没有进行关联起来,调用event_add函数是将最终是将此event添加到epoll中的红黑树中,event_add--->event_add_internal -----> evmap_io_add---->evsel->add
这个add函数调用的是之前关联的epoll中的epoll_nochangelist_add,最终是调用epoll_ctl将此event添加到之前申请的RB树中,
epoll_nochangelist_add 调用epoll_apply_one_change函数,epoll_apply_one_change函数核心完成功能如下:
至此,将一个event注册到event_base中了,接下来调用event_base_loop 函数,等待事件的到来,然后调用相关的回调函数。
int event_base_loop(struct event_base *base, int flags)
内部调用 evsel->dispatch函数,对应epoll中的epoll_dispatch函数,
可以看到loop中最终使用epoll_wait等待事件的到来了,有事件到来时进行调用回调函数。
事件返回 的集合,linux下epoll锁使用的是一个evmap_io列表,记录给定的fd上可读或者可写的所有事件,以及每个事件的数量。
res返回后,调用evmap_io_active 激活在event_base上等待给定fd的一组事件。
event_base_loop 中 res = evsel->dispatch(base, tv_p); 返回激活的数量,激活事件存放到了event_base结构体中的activequeues队列中,
最后循环队列执行回调函数。
发布一个高性能的Reactor模式的C++网络库:evpp
发布一个高性能的Reactor模式的C++网络库:evpp
简介
https://github.com/Qihoo360/evpp是一个基于libevent开发的现代化的支持C++11特性的高性能网络库,自带TCP/UDP/HTTP等协议的异步非阻塞式的服务器和客户端库。
特性
- 现代版的C++11接口
- 非阻塞异步接口都是C++11的functional/bind形式的回调仿函数(不是libevent中的C风格的函数指针)
- 非阻塞纯异步多线程TCP服务器/客户端
- 非阻塞纯异步多线程HTTP服务器/客户端
- 非阻塞纯异步多线程UDP服务器
- 支持多进程模式
- 优秀的跨平台特性和高性能(继承自libevent的优点)
除此之外,基于该库之上,还提供两个附带的应用层协议库:
evmc
:一个纯异步非阻塞式的memcached
的C++客户端库,支持membase
集群模式。该库已经用于生产环境,每天发起1000多亿次memcache查询请求。详情请见:evmc readmeevnsq
: 一个纯异步非阻塞式的NSQ
的C++客户端库,支持消费者、生产者、服务发现等特性。该库已经用于生产环境,每天生产200多亿条日志消息。详情请见:evnsq readme
将来还会推出redis
的客户端库。
项目由来
我们开发小组负责的业务需要用到TCP协议来建设长连接网关服务和一些其他的一些基于TCP的短连接服务,在调研开源项目的过程中,没有发现一个合适的库来满足我们要求。结合我们自身的业务情况,理想中的C++网络库应具备一下几个特性:
- 接口简单易用,最好是C++接口
- 多线程,也能支持多进程
- 最好是基于libevent实现(因为现有的历史遗留框架、基础库等是依赖libevent),这样能很方便嵌入libevent的事件循环,否则改动较大或者集成起来的程序可能会有很多跨线程的调用(这些会带来编程的复杂性以及跨线程锁带来的性能下降)
基于这些需求,可供选择的不多,所以我们只能自己开发一个。开发过程中,接口设计方面基本上大部分是参考muduo项目来设计和实现的,当然也做了一些取舍和增改;同时也大量借鉴了Golang的一些设计哲学和思想。下面举几个小例子来说明一下:
Duration
: 这是一个时间区间相关的类,自带时间单位信息,参考了Golang项目中的Duration
实现。我们在其他项目中见到太多的时间是不带单位的,例如timeout
,到底是秒、毫秒还是微秒?需要看文档说明或具体实现,好一点的设计会将单位带在变量名中,例如timeout_ms
,但还是没有Duration
这种独立的类好。目前C++11中也有类似的实现std::chrono::duration
,但稍显复杂,没有咱们这个借鉴Golang实现的版本来的简单明了Buffer
: 这是一个缓冲区类,融合了muduo
和Golang两个项目中相关类的设计和实现http::Server
: 这是一个http服务器类,自带线程池,它的事件循环和工作线程调度,完全是线程安全的,业务层不用太多关心跨线程调用问题。同时,还将http服务器的核心功能单独抽取出来形成http::Service
类,是一个可嵌入型的服务器类,可以嵌入到已有的libevent事件循环中- 网络地址的表达就仅仅使用
"ip:port"
这种形式字符串表示,就是参考Golang的设计 httpc::ConnPool
是一个http的客户端连接池库,设计上尽量考虑高性能和复用。以后基于此还可以增加负载均衡和故障转移等特性
另外,我们实现过程中极其重视线程安全问题,一个事件相关的资源必须在其所属的EventLoop
(每个EventLoop
绑定一个线程)中初始化和析构释放,这样我们能最大限度的减少出错的可能。为了达到这个目标我们重载event_add
和event_del
等函数,每一次调用event_add
,就在对应的线程私有数据中记录该对象,在调用event_del
时,检查之前该线程私有数据中是否拥有该对象,然后在整个程序退出前,再完整的检查所有线程的私有数据,看看是否仍然有对象没有析构释放。具体实现稍有区别,详细代码实现可以参考 https://github.com/Qihoo360/evpp/blob/master/evpp/inner_pre.cc#L46~L87。我们如此苛刻的追求线程安全,只是为了让一个程序能安静的平稳的退出或Reload,因为我们深刻的理解“编写永远运行的系统,和编写运行一段时间后平静关闭的系统是两码事”,后者要困难的多得多。
吞吐量Benchmark测试报告
本文用 ping pong 测试来对比evpp与libevent、boost.asio、muduo] 等网络的吞吐量,测试结果表明evpp吞吐量与boost.asio、muduo等相当,比libevent高17%~130%左右。
evpp本身是基于libevent实现的,不过evpp只是用了libevent的事件循环,并没有用libevent的evbuffer
,而是自己参考muduo和Golang实现了自己的网络IO读写类Buffer。
性能测试相关的代码都在这里:https://github.com/Qihoo360/evpp/tree/master/benchmark/.
测试对象
- evpp-0.2.0 based on libevent-2.0.21
- boost.asio-1.53
- libevent-2.0.21
系统环境
- 操作系统:Linux CentOS 6.2, 2.6.32-220.7.1.el6.x86_64
- 硬件CPU:Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz
几个简单的示例代码
TCP Echo服务器
#include <evpp/tcp_server.h>
#include <evpp/buffer.h>
#include <evpp/tcp_conn.h>
int main(int argc, char* argv[])
std::string addr = "0.0.0.0:9099";
int thread_num = 4;
evpp::EventLoop loop;
evpp::TCPServer server(&loop, addr, "TCPEchoServer", thread_num);
server.SetMessageCallback([](const evpp::TCPConnPtr& conn,
evpp::Buffer* msg,
evpp::Timestamp ts)
conn->Send(msg);
);
server.SetConnectionCallback([](const evpp::TCPConnPtr& conn)
if (conn->IsConnected())
LOG_INFO << "A new connection from " << conn->remote_addr();
else
LOG_INFO << "Lost the connection from " << conn->remote_addr();
);
server.Init();
server.Start();
loop.Run();
return 0;
HTTP Echo服务器
#include <evpp/exp.h>
#include <evpp/http/http_server.h>
int main(int argc, char* argv[])
std::vector<int> ports = 9009, 23456, 23457 ;
int thread_num = 2;
evpp::http::Server server(thread_num);
server.RegisterHandler("/echo",
[](evpp::EventLoop* loop,
const evpp::http::ContextPtr& ctx,
const evpp::http::HTTPSendResponseCallback& cb)
cb(ctx->body().ToString());
);
server.Init(ports);
server.Start();
while (!server.IsStopped())
usleep(1);
return 0;
UDP Echo服务器
#include <evpp/exp.h>
#include <evpp/udp/udp_server.h>
#include <evpp/udp/udp_message.h>
int main(int argc, char* argv[])
std::vector<int> ports = 1053, 5353 ;
evpp::udp::Server server;
server.SetMessageHandler([](evpp::EventLoop* loop, evpp::udp::MessagePtr& msg)
evpp::udp::SendMessage(msg);
);
server.Init(ports);
server.Start();
while (!server.IsStopped())
usleep(1);
return 0;
致谢
以上是关于libevent & Reactor模式的主要内容,如果未能解决你的问题,请参考以下文章