libevent 线程池的设计

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了libevent 线程池的设计相关的知识,希望对你有一定的参考价值。

参考技术A 在我所做过的一个基于libevent项目中, 我所使用的线程模型是 one event_base per thread + thread pool 模型。每个线程最多有一个event_base, 每一个 TCP 连接必须由某一个event_base 管理,所有这个线程的 IO 都会转移到这个 event_base 上面来处理。换句话来说,一个 file descriptor 只能由一个线程来读写。这样,我们就很方便地把不同的 TCP 连接放到不同的线程里面去。

一个节点支持多线程,它有两种模式

在其他的线程中,只要拿到一个 libevent_thread_t 的对象,往里面的 base 添加事件就好了。

创建新线程调用的函数(一直循环):

虽然其他的线程可以往这个线程添加事件,但是当一个 libevent_thread_t 对象释放的时候, 其他的线程不能 break 这个线程的 event_base,只能是这个线程自己 break 自己的 event_base,所以在实现上我们可以首先要打开一个管道

为这个管道一端的 fd 在event_base上面添加监听事件,有数据可读的时候就 break event_base 就可以退出了

所以当我们想要退出时,向 fd 发送数据

至于线程池,就是一个 libevent_thread_t 的数组,通过next来指定函数返回的线程,其中 n 为线程池容量大小, next 自增到 n 时变成 0

在多线程环境中,libevent 的 event_base 的 loopbreak必须由他自己的线程来实现,所以其他线程只能是通过管道来通知。
在一个线程往另一个线程的 event_base 添加事件的时候,也就是在多个线程对于一个 event_base 操作的时候,event_base 需要对它自己的数据结构加锁,所以在使用线程池或者写多线程的程序的时候,开始时需要调用, 文档

C++线程池

使用libevent封装的c++内嵌线程池,在主线程负责调度,在开启的线程中负责处理接受到的任务。在主线程使用管道发送信号给开启的线程,在开启的线程中使用libevent监听主线程发送来的信号,接收成功以后,从线程中维护的任务队列取出,开始处理任务。下面是线程池的UML类图

 主函数:

//
// Created by wenfan on 2021/5/4.
//

#include <iostream>
#include <event2/event.h>
#include <event2/listener.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "Task.h"
#include "TaskFactory.h"
#include "ThreadPool.h"
#define PORT 8877

char* getIp(sockaddr_in* remoteAddr){
    return inet_ntoa(remoteAddr->sin_addr);
}

int getPort(sockaddr_in* remoteAddr){
    return ntohs(remoteAddr->sin_port);
}

using namespace std;

void listen_cb(struct evconnlistener *listener,
        evutil_socket_t accept_sockfd,
        struct sockaddr * remoteAddr,
        int socklen, void* arg){
    cout << "listen_cb" << endl;
    Task* task = TaskFactory::getInstance()->createTask();
    task->accept_fd = accept_sockfd;
    ThreadPool::getInstance()->Dispatch(task);

}


int main(int argc, char* argv[]){
    event_base* base = event_base_new();

    sockaddr_in sin = {0};
    memset(&sin, 0, sizeof sin);
    sin.sin_family = AF_INET;
    sin.sin_port = htons(PORT);

    evconnlistener* listener = evconnlistener_new_bind(base,
                                                       listen_cb,
                                                       base,
                                                       LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
                                                       10,
                                                       (sockaddr*) & sin,
                                                       sizeof(sin));
    ThreadPool::getInstance()->Init(10);
    event_base_dispatch(base);
    evconnlistener_free(listener);
    event_base_free(base);


    return 0;
}

ThreadPool.h

//
// Created by wenfan on 2021/5/4.
//

#ifndef LIBEVENT_FTP_SERVER_THREADPOOL_H
#define LIBEVENT_FTP_SERVER_THREADPOOL_H
#include <vector>
#include "Thread.h"
#include "Task.h"

class ThreadPool {
public:
    void Init(int thread_cnt);
    void Dispatch(Task* task);
    static ThreadPool* getInstance();

private:
    std::vector<Thread*> threads;
    int lastThread = -1;
    int threadCnt = 10;
    ThreadPool()= default;
};


#endif //LIBEVENT_FTP_SERVER_THREADPOOL_H

ThreadPool.cpp

//
// Created by wenfan on 2021/5/4.
//

#include "ThreadPool.h"
#include <iostream>
#include <thread>

using namespace std;

ThreadPool* ThreadPool::getInstance(){
    static ThreadPool pool;
    return &pool;
}

void ThreadPool::Init(int thread_cnt){
    if(thread_cnt <= 0){
        cerr << "thread count must be greater than 0!" << endl;
        return;
    }
    cout << this << endl;
    this->threadCnt = thread_cnt;
    for (int i = 0; i < thread_cnt; ++i) {
        auto* t = new Thread;
        threads.push_back(t);
        t->id = i + 1;
        t->Start();
        cout << "thread : " << t->id << " has started!" << endl;
        this_thread::sleep_for(chrono::milliseconds(10));
    }
}


void ThreadPool::Dispatch(Task *task) {
    if(!task)
        return;
    int nextThread = (this->lastThread + 1) % this->threadCnt;
    this->lastThread = nextThread;
    auto* t = this->threads[nextThread];
    t->add(task);
    //thread pool -> thread
    t->Activate();
}

Thread.h

//
// Created by wenfan on 2021/5/5.
//

#ifndef LIBEVENT_FTP_SERVER_THREAD_H
#define LIBEVENT_FTP_SERVER_THREAD_H
#include <event2/event.h>
#include <queue>
#include "Task.h"
#include <mutex>


class Thread {
public:
    bool Init();
    void Main();
    void Start();
    static void read_cb(evutil_socket_t, short, void *);
    void Notify(evutil_socket_t fd);
    void add(Task*);
    int id;
    void Activate();

private:
    int notify_fd;
    event_base* base;
    //任务队列
    std::queue<Task*> tasks;
    std::mutex tasks_mutex;

};


#endif //LIBEVENT_FTP_SERVER_THREAD_H

Thread.cpp

//
// Created by wenfan on 2021/5/5.
//

#include "Thread.h"
#include <unistd.h>
#include <iostream>
#include <thread>

using namespace std;

/**
 *  thread base's read_cb
 * @param fd
 * @param which
 * @param arg
 */
void Thread::read_cb(evutil_socket_t fd, short which, void* arg) {
    auto* t = (Thread*) arg;
    t->Notify(fd);
}

/**
 * thread init base
 * @return
 */
bool Thread::Init(){
    int fd[2];
    if(pipe(fd)){
        cerr << "create fd failed!" << endl;
        return false;
    }
    this->notify_fd = fd[1];

    // create libevent context(unlock)
    event_config* config = event_config_new();
    event_config_set_flag(config, EVENT_BASE_FLAG_NOLOCK);
    this->base = event_base_new_with_config(config);
    event_config_free(config);

    if(this->base == nullptr){
        cerr << "create event_base_new_with_config failed!" << endl;
        return false;
    }

    // set read callback to thread base
    event* ev = event_new(Thread::base, fd[0], EV_READ | EV_PERSIST, read_cb, this);
    event_add(ev, nullptr);
    return true;
}


/**
 *  recv pipe fd msg
 * @param fd
 */
void Thread::Notify(evutil_socket_t fd) {
    char buf[2] = {0};
    size_t len = read(fd, buf, 1);
    if(len <= 0){
        cerr << "read pipe fd msg failed!" << endl;
        return ;
    }
    cout << "recv pipe :" << buf << endl;
    //fetch task from thread's tasks
    Task* task = nullptr;
    this->tasks_mutex.lock();
    if(this->tasks.empty()){
        this->tasks_mutex.unlock();
        return;
    }
    task = this->tasks.front();
    this->tasks.pop();
    this->tasks_mutex.unlock();

    //handle task
    task->Init();


}

/**
 * real thread run this loop
 */
void Thread::Main(){
    if(event_base_dispatch(this->base)){
        cerr << "thread base loop start failed!" << endl;
    }
    event_base_free(this->base);
}

/**
 * init and start thread
 */
void Thread::Start(){
    this->Init();
    thread th(&Thread::Main, this);
    th.detach();
}

/**
 * add task to threads, there are thread safe prob , and lock threads.
 * @param task
 */
void Thread::add(Task* task) {
    task->base = this->base;
    this->tasks_mutex.lock();
    this->tasks.push(task);
    this->tasks_mutex.unlock();
}

/**
 * send pipe msg for notify thread that here are task to handle
 */
void Thread::Activate() {
    size_t len = write(this->notify_fd, "c", 1);
    if(len <= 0){
        cerr << "send" << endl;
        return;
    }
    cout << "notify success!" << endl;
}

Task.h接口

//
// Created by wenfan on 2021/5/5.
//

#ifndef LIBEVENT_FTP_SERVER_TASK_H
#define LIBEVENT_FTP_SERVER_TASK_H
#include <event2/event.h>


class Task {
public:
    event_base* base{};
    int accept_fd = 0;
    int thread_id = 0;
    virtual bool Init() = 0;
};


#endif //LIBEVENT_FTP_SERVER_TASK_H

FtpTask.h

//
// Created by wenfan on 2021/5/6.
//

#ifndef LIBEVENT_FTP_SERVER_FTPTASK_H
#define LIBEVENT_FTP_SERVER_FTPTASK_H
#include "Task.h"
#include <event2/bufferevent.h>

class FtpTask : public Task{
public:
    virtual void read(bufferevent* bev){}
    virtual void write(bufferevent* bev){}
    virtual void event(bufferevent* bev, short what){}
protected:
    void setCallback(bufferevent* bev);
    static void read_cb(bufferevent* bev,void* args);
    static void write_cb(bufferevent* bev,void* args);
    static void event_cb(bufferevent* bev,short what, void* args);

};


#endif //LIBEVENT_FTP_SERVER_FTPTASK_H

FtpTask.cpp

//
// Created by wenfan on 2021/5/6.
//

#include "FtpTask.h"

void FtpTask::setCallback(bufferevent* bev){
    bufferevent_setcb(bev, read_cb, write_cb, event_cb, this);
    bufferevent_enable(bev, EV_WRITE | EV_READ);
}

void FtpTask::read_cb(bufferevent* bev,void* args){
    auto* task = (FtpTask*)args;
    task->read(bev);
}
void FtpTask::write_cb(bufferevent* bev,void* args){
    auto* task = (FtpTask*)args;
    task->write(bev);
}
void FtpTask::event_cb(bufferevent* bev,short what, void* args){
    auto* task = (FtpTask*)args;
    task->event(bev, what);
}

FtpServerCMD.h

//
// Created by wenfan on 2021/5/6.
//

#ifndef LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
#define LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
#include "FtpTask.h"
#include <event2/bufferevent.h>
class FtpServerCMD : public FtpTask{
public:
    virtual bool Init();
    virtual void read(bufferevent* bev);
    virtual void write(bufferevent* bev);
    virtual void event(bufferevent* bev,short what);
};


#endif //LIBEVENT_FTP_SERVER_FTPSERVERCMD_H

FtpServerCMD.cpp

//
// Created by wenfan on 2021/5/6.
//

#include "FtpServerCMD.h"
#include <iostream>

using namespace std;

bool FtpServerCMD::Init() {
    cout << "FtpServerCMD::Init" << endl;
    bufferevent* bev = bufferevent_socket_new(this->base,
                                              this->accept_fd,
                                              BEV_OPT_CLOSE_ON_FREE);
    this->setCallback(bev);

    //添加超时
    timeval rt = {60 ,0};
    bufferevent_set_timeouts(bev, &rt, nullptr);
    return true;
}

void FtpServerCMD::read(bufferevent *bev) {
    char data[BUFSIZ] = {0};
    while(true){
        int len = bufferevent_read(bev, data, sizeof(data) - 1);
        if(len <=0 ){
            bufferevent_free(bev);
            delete this;
            break;
        }
        data[len] = '\\0';
        cout << "Recv CMD:" << data << endl;
        //分发处理对象
    }
    bufferevent_write(bev,"ok\\n",3);
}


void FtpServerCMD::write(bufferevent *bev) {

}

void FtpServerCMD::event(bufferevent *bev, short what) {
    if(what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_TIMEOUT)){
        cout << "BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_TIMEOUT" << endl;
        bufferevent_free(bev);
        //delete this;
    }
}

TaskFactory.h任务工厂类(单类模式)

//
// Created by wenfan on 2021/5/6.
//

#ifndef LIBEVENT_FTP_SERVER_TASKFACTORY_H
#define LIBEVENT_FTP_SERVER_TASKFACTORY_H
#include "Task.h"

class TaskFactory {
public:
    static TaskFactory* getInstance();
    Task* createTask();
};


#endif //LIBEVENT_FTP_SERVER_TASKFACTORY_H

 TaskFactory.cpp

//
// Created by wenfan on 2021/5/6.
//

#include "TaskFactory.h"
#include "FtpServerCMD.h"

TaskFactory* TaskFactory::getInstance(){
    static TaskFactory factory;
    return &factory;
}

Task* TaskFactory::createTask(){
    return new FtpServerCMD;
}
 最后,最重要的CMakeLists.txt
cmake_minimum_required(VERSION 3.17)
project(libevent_ftp_server)

set(CMAKE_CXX_STANDARD 11)

# add extra include directories
include_directories(/usr/local/libevent/include)
include_directories(/usr/local/zlib/include)
# add extra lib directories
link_directories(/usr/local/libevent/lib)
link_directories(/usr/local/zlib/lib)
link_libraries(event)
link_libraries(z)


aux_source_directory(./self_pool self)
add_executable(self ${self})

target_link_libraries(self event)

 

以上是关于libevent 线程池的设计的主要内容,如果未能解决你的问题,请参考以下文章

与多个线程池相比,单个线程池的设计是不是更好

线程池的实现设计

线程池的设计思想

线程池监控设计

线程池的设计原理是什么?

Linux/设计模式线程池的实现