libevent & Reactor模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了libevent & Reactor模式相关的知识,希望对你有一定的参考价值。

参考技术A 整个libevent本身就是一个Reactor模式;Reactor是一种事件驱动机制,应用程序提供相应的接口并且注册到Reactor中,如果相应的事件发生,Reactor将主动调用应用程序的接口。

Reactor框架需要包含几个组件:事件源,Reactor框架,IO多路复用机制,事件处理程序。

事件源:linux上是文件描述符,

IO多路复用:epoll,select等;程序将关心的fd以及事件(read,write)注册到IO多路复用上(epoll),事件到达后,epoll发出通知。

Reactor反应器:libevent中就是指event_base结构体。

事件处理程序:对应libevent中,就是event结构体。

libevent初始化Reactor: 

struct event_base* reactor = event_base_new();

函数实现内容是 对结构体struct event_base的初始化操作:malloc内存,选择适当的IO多路复用(此处以epoll为例),调用IO多路复用的初始化操作,epoll的初始化操作(epoll_init),调用epoll_create创建一个RB树头节点。

事件处理程序:event

使用event_new 或者event_assign函数 获得一个event,二者的区别是event_new 内部对结构体event进行申请空间后再调用event_assign操作。

event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)

函数event_assign完成的功能是对event结构体的初始化工作

以上对base_event和event 进行了初始化相关操作,二者还没有进行关联起来,调用event_add函数是将最终是将此event添加到epoll中的红黑树中,event_add--->event_add_internal -----> evmap_io_add---->evsel->add

这个add函数调用的是之前关联的epoll中的epoll_nochangelist_add,最终是调用epoll_ctl将此event添加到之前申请的RB树中,

epoll_nochangelist_add 调用epoll_apply_one_change函数,epoll_apply_one_change函数核心完成功能如下:

至此,将一个event注册到event_base中了,接下来调用event_base_loop 函数,等待事件的到来,然后调用相关的回调函数。

int event_base_loop(struct event_base *base, int flags)

内部调用 evsel->dispatch函数,对应epoll中的epoll_dispatch函数,

可以看到loop中最终使用epoll_wait等待事件的到来了,有事件到来时进行调用回调函数。

事件返回 的集合,linux下epoll锁使用的是一个evmap_io列表,记录给定的fd上可读或者可写的所有事件,以及每个事件的数量。

res返回后,调用evmap_io_active  激活在event_base上等待给定fd的一组事件。

event_base_loop 中 res = evsel->dispatch(base, tv_p); 返回激活的数量,激活事件存放到了event_base结构体中的activequeues队列中,

最后循环队列执行回调函数。

发布一个高性能的Reactor模式的C++网络库:evpp

发布一个高性能的Reactor模式的C++网络库:evpp

简介

https://github.com/Qihoo360/evpp是一个基于libevent开发的现代化的支持C++11特性的高性能网络库,自带TCP/UDP/HTTP等协议的异步非阻塞式的服务器和客户端库。

特性

  1. 现代版的C++11接口
  2. 非阻塞异步接口都是C++11的functional/bind形式的回调仿函数(不是libevent中的C风格的函数指针)
  3. 非阻塞纯异步多线程TCP服务器/客户端
  4. 非阻塞纯异步多线程HTTP服务器/客户端
  5. 非阻塞纯异步多线程UDP服务器
  6. 支持多进程模式
  7. 优秀的跨平台特性和高性能(继承自libevent的优点)

除此之外,基于该库之上,还提供两个附带的应用层协议库:

  1. evmc :一个纯异步非阻塞式的memcached的C++客户端库,支持membase集群模式。该库已经用于生产环境,每天发起1000多亿次memcache查询请求。详情请见:evmc readme
  2. evnsq : 一个纯异步非阻塞式的NSQ的C++客户端库,支持消费者、生产者、服务发现等特性。该库已经用于生产环境,每天生产200多亿条日志消息。详情请见:evnsq readme

将来还会推出redis的客户端库。

项目由来

我们开发小组负责的业务需要用到TCP协议来建设长连接网关服务和一些其他的一些基于TCP的短连接服务,在调研开源项目的过程中,没有发现一个合适的库来满足我们要求。结合我们自身的业务情况,理想中的C++网络库应具备一下几个特性:

  1. 接口简单易用,最好是C++接口
  2. 多线程,也能支持多进程
  3. 最好是基于libevent实现(因为现有的历史遗留框架、基础库等是依赖libevent),这样能很方便嵌入libevent的事件循环,否则改动较大或者集成起来的程序可能会有很多跨线程的调用(这些会带来编程的复杂性以及跨线程锁带来的性能下降)

基于这些需求,可供选择的不多,所以我们只能自己开发一个。开发过程中,接口设计方面基本上大部分是参考muduo项目来设计和实现的,当然也做了一些取舍和增改;同时也大量借鉴了Golang的一些设计哲学和思想。下面举几个小例子来说明一下:

  1. Duration : 这是一个时间区间相关的类,自带时间单位信息,参考了Golang项目中的Duration实现。我们在其他项目中见到太多的时间是不带单位的,例如timeout,到底是秒、毫秒还是微秒?需要看文档说明或具体实现,好一点的设计会将单位带在变量名中,例如timeout_ms,但还是没有Duration这种独立的类好。目前C++11中也有类似的实现std::chrono::duration,但稍显复杂,没有咱们这个借鉴Golang实现的版本来的简单明了
  2. Buffer : 这是一个缓冲区类,融合了muduoGolang两个项目中相关类的设计和实现
  3. http::Server : 这是一个http服务器类,自带线程池,它的事件循环和工作线程调度,完全是线程安全的,业务层不用太多关心跨线程调用问题。同时,还将http服务器的核心功能单独抽取出来形成http::Service类,是一个可嵌入型的服务器类,可以嵌入到已有的libevent事件循环中
  4. 网络地址的表达就仅仅使用"ip:port"这种形式字符串表示,就是参考Golang的设计
  5. httpc::ConnPool是一个http的客户端连接池库,设计上尽量考虑高性能和复用。以后基于此还可以增加负载均衡和故障转移等特性

另外,我们实现过程中极其重视线程安全问题,一个事件相关的资源必须在其所属的EventLoop(每个EventLoop绑定一个线程)中初始化和析构释放,这样我们能最大限度的减少出错的可能。为了达到这个目标我们重载event_addevent_del等函数,每一次调用event_add,就在对应的线程私有数据中记录该对象,在调用event_del时,检查之前该线程私有数据中是否拥有该对象,然后在整个程序退出前,再完整的检查所有线程的私有数据,看看是否仍然有对象没有析构释放。具体实现稍有区别,详细代码实现可以参考 https://github.com/Qihoo360/evpp/blob/master/evpp/inner_pre.cc#L46~L87。我们如此苛刻的追求线程安全,只是为了让一个程序能安静的平稳的退出或Reload,因为我们深刻的理解“编写永远运行的系统,和编写运行一段时间后平静关闭的系统是两码事”,后者要困难的多得多。

吞吐量Benchmark测试报告

本文用 ping pong 测试来对比evpplibeventboost.asio、muduo] 等网络的吞吐量,测试结果表明evpp吞吐量与boost.asiomuduo等相当,比libevent17%~130%左右。

evpp本身是基于libevent实现的,不过evpp只是用了libevent的事件循环,并没有用libeventevbuffer,而是自己参考muduoGolang实现了自己的网络IO读写类Buffer

性能测试相关的代码都在这里:https://github.com/Qihoo360/evpp/tree/master/benchmark/.

测试对象
  1. evpp-0.2.0 based on libevent-2.0.21
  2. boost.asio-1.53
  3. libevent-2.0.21
系统环境
  • 操作系统:Linux CentOS 6.2, 2.6.32-220.7.1.el6.x86_64
  • 硬件CPU:Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz


几个简单的示例代码

TCP Echo服务器

#include <evpp/tcp_server.h>
#include <evpp/buffer.h>
#include <evpp/tcp_conn.h>

int main(int argc, char* argv[]) 
    std::string addr = "0.0.0.0:9099";
    int thread_num = 4;
    evpp::EventLoop loop;
    evpp::TCPServer server(&loop, addr, "TCPEchoServer", thread_num);
    server.SetMessageCallback([](const evpp::TCPConnPtr& conn,
                                 evpp::Buffer* msg,
                                 evpp::Timestamp ts) 
        conn->Send(msg);
    );
    server.SetConnectionCallback([](const evpp::TCPConnPtr& conn) 
        if (conn->IsConnected()) 
            LOG_INFO << "A new connection from " << conn->remote_addr();
         else 
            LOG_INFO << "Lost the connection from " << conn->remote_addr();
        
    );
    server.Init();
    server.Start();
    loop.Run();
    return 0;

HTTP Echo服务器

#include <evpp/exp.h>
#include <evpp/http/http_server.h>

int main(int argc, char* argv[]) 
    std::vector<int> ports =  9009, 23456, 23457 ;
    int thread_num = 2;
    evpp::http::Server server(thread_num);
    server.RegisterHandler("/echo",
                           [](evpp::EventLoop* loop,
                              const evpp::http::ContextPtr& ctx,
                              const evpp::http::HTTPSendResponseCallback& cb) 
        cb(ctx->body().ToString()); 
    );
    server.Init(ports);
    server.Start();
    while (!server.IsStopped()) 
        usleep(1);
    
    return 0;

UDP Echo服务器

#include <evpp/exp.h>
#include <evpp/udp/udp_server.h>
#include <evpp/udp/udp_message.h>

int main(int argc, char* argv[]) 
    std::vector<int> ports =  1053, 5353 ;
    evpp::udp::Server server;
    server.SetMessageHandler([](evpp::EventLoop* loop, evpp::udp::MessagePtr& msg) 
        evpp::udp::SendMessage(msg);
    );
    server.Init(ports);
    server.Start();

    while (!server.IsStopped()) 
        usleep(1);
    
    return 0;

致谢

  1. 感谢奇虎360公司对该项目的支持
  2. 感谢libevent, glog, gtest, Golang等项目
  3. evpp深度参考了muduo项目的实现和设计,非常感谢Chen Shuo

以上是关于libevent & Reactor模式的主要内容,如果未能解决你的问题,请参考以下文章

libevent学习笔记(参考libevent深度剖析)

浅析libevent

反应器(Reactor)和主动器(Proactor)

Reactor-反应器模式

高级I/O框架库libevent

libevent