libevent 线程池的设计
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了libevent 线程池的设计相关的知识,希望对你有一定的参考价值。
参考技术A 在我所做过的一个基于libevent项目中, 我所使用的线程模型是 one event_base per thread + thread pool 模型。每个线程最多有一个event_base, 每一个 TCP 连接必须由某一个event_base 管理,所有这个线程的 IO 都会转移到这个 event_base 上面来处理。换句话来说,一个 file descriptor 只能由一个线程来读写。这样,我们就很方便地把不同的 TCP 连接放到不同的线程里面去。一个节点支持多线程,它有两种模式
在其他的线程中,只要拿到一个 libevent_thread_t 的对象,往里面的 base 添加事件就好了。
创建新线程调用的函数(一直循环):
虽然其他的线程可以往这个线程添加事件,但是当一个 libevent_thread_t 对象释放的时候, 其他的线程不能 break 这个线程的 event_base,只能是这个线程自己 break 自己的 event_base,所以在实现上我们可以首先要打开一个管道
为这个管道一端的 fd 在event_base上面添加监听事件,有数据可读的时候就 break event_base 就可以退出了
所以当我们想要退出时,向 fd 发送数据
至于线程池,就是一个 libevent_thread_t 的数组,通过next来指定函数返回的线程,其中 n 为线程池容量大小, next 自增到 n 时变成 0
在多线程环境中,libevent 的 event_base 的 loopbreak必须由他自己的线程来实现,所以其他线程只能是通过管道来通知。
在一个线程往另一个线程的 event_base 添加事件的时候,也就是在多个线程对于一个 event_base 操作的时候,event_base 需要对它自己的数据结构加锁,所以在使用线程池或者写多线程的程序的时候,开始时需要调用, 文档
C++线程池
使用libevent封装的c++内嵌线程池,在主线程负责调度,在开启的线程中负责处理接受到的任务。在主线程使用管道发送信号给开启的线程,在开启的线程中使用libevent监听主线程发送来的信号,接收成功以后,从线程中维护的任务队列取出,开始处理任务。下面是线程池的UML类图
主函数:
//
// Created by wenfan on 2021/5/4.
//
#include <iostream>
#include <event2/event.h>
#include <event2/listener.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "Task.h"
#include "TaskFactory.h"
#include "ThreadPool.h"
#define PORT 8877
char* getIp(sockaddr_in* remoteAddr){
return inet_ntoa(remoteAddr->sin_addr);
}
int getPort(sockaddr_in* remoteAddr){
return ntohs(remoteAddr->sin_port);
}
using namespace std;
void listen_cb(struct evconnlistener *listener,
evutil_socket_t accept_sockfd,
struct sockaddr * remoteAddr,
int socklen, void* arg){
cout << "listen_cb" << endl;
Task* task = TaskFactory::getInstance()->createTask();
task->accept_fd = accept_sockfd;
ThreadPool::getInstance()->Dispatch(task);
}
int main(int argc, char* argv[]){
event_base* base = event_base_new();
sockaddr_in sin = {0};
memset(&sin, 0, sizeof sin);
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
evconnlistener* listener = evconnlistener_new_bind(base,
listen_cb,
base,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
10,
(sockaddr*) & sin,
sizeof(sin));
ThreadPool::getInstance()->Init(10);
event_base_dispatch(base);
evconnlistener_free(listener);
event_base_free(base);
return 0;
}
ThreadPool.h
//
// Created by wenfan on 2021/5/4.
//
#ifndef LIBEVENT_FTP_SERVER_THREADPOOL_H
#define LIBEVENT_FTP_SERVER_THREADPOOL_H
#include <vector>
#include "Thread.h"
#include "Task.h"
class ThreadPool {
public:
void Init(int thread_cnt);
void Dispatch(Task* task);
static ThreadPool* getInstance();
private:
std::vector<Thread*> threads;
int lastThread = -1;
int threadCnt = 10;
ThreadPool()= default;
};
#endif //LIBEVENT_FTP_SERVER_THREADPOOL_H
ThreadPool.cpp
//
// Created by wenfan on 2021/5/4.
//
#include "ThreadPool.h"
#include <iostream>
#include <thread>
using namespace std;
ThreadPool* ThreadPool::getInstance(){
static ThreadPool pool;
return &pool;
}
void ThreadPool::Init(int thread_cnt){
if(thread_cnt <= 0){
cerr << "thread count must be greater than 0!" << endl;
return;
}
cout << this << endl;
this->threadCnt = thread_cnt;
for (int i = 0; i < thread_cnt; ++i) {
auto* t = new Thread;
threads.push_back(t);
t->id = i + 1;
t->Start();
cout << "thread : " << t->id << " has started!" << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}
}
void ThreadPool::Dispatch(Task *task) {
if(!task)
return;
int nextThread = (this->lastThread + 1) % this->threadCnt;
this->lastThread = nextThread;
auto* t = this->threads[nextThread];
t->add(task);
//thread pool -> thread
t->Activate();
}
Thread.h
//
// Created by wenfan on 2021/5/5.
//
#ifndef LIBEVENT_FTP_SERVER_THREAD_H
#define LIBEVENT_FTP_SERVER_THREAD_H
#include <event2/event.h>
#include <queue>
#include "Task.h"
#include <mutex>
class Thread {
public:
bool Init();
void Main();
void Start();
static void read_cb(evutil_socket_t, short, void *);
void Notify(evutil_socket_t fd);
void add(Task*);
int id;
void Activate();
private:
int notify_fd;
event_base* base;
//任务队列
std::queue<Task*> tasks;
std::mutex tasks_mutex;
};
#endif //LIBEVENT_FTP_SERVER_THREAD_H
Thread.cpp
//
// Created by wenfan on 2021/5/5.
//
#include "Thread.h"
#include <unistd.h>
#include <iostream>
#include <thread>
using namespace std;
/**
* thread base's read_cb
* @param fd
* @param which
* @param arg
*/
void Thread::read_cb(evutil_socket_t fd, short which, void* arg) {
auto* t = (Thread*) arg;
t->Notify(fd);
}
/**
* thread init base
* @return
*/
bool Thread::Init(){
int fd[2];
if(pipe(fd)){
cerr << "create fd failed!" << endl;
return false;
}
this->notify_fd = fd[1];
// create libevent context(unlock)
event_config* config = event_config_new();
event_config_set_flag(config, EVENT_BASE_FLAG_NOLOCK);
this->base = event_base_new_with_config(config);
event_config_free(config);
if(this->base == nullptr){
cerr << "create event_base_new_with_config failed!" << endl;
return false;
}
// set read callback to thread base
event* ev = event_new(Thread::base, fd[0], EV_READ | EV_PERSIST, read_cb, this);
event_add(ev, nullptr);
return true;
}
/**
* recv pipe fd msg
* @param fd
*/
void Thread::Notify(evutil_socket_t fd) {
char buf[2] = {0};
size_t len = read(fd, buf, 1);
if(len <= 0){
cerr << "read pipe fd msg failed!" << endl;
return ;
}
cout << "recv pipe :" << buf << endl;
//fetch task from thread's tasks
Task* task = nullptr;
this->tasks_mutex.lock();
if(this->tasks.empty()){
this->tasks_mutex.unlock();
return;
}
task = this->tasks.front();
this->tasks.pop();
this->tasks_mutex.unlock();
//handle task
task->Init();
}
/**
* real thread run this loop
*/
void Thread::Main(){
if(event_base_dispatch(this->base)){
cerr << "thread base loop start failed!" << endl;
}
event_base_free(this->base);
}
/**
* init and start thread
*/
void Thread::Start(){
this->Init();
thread th(&Thread::Main, this);
th.detach();
}
/**
* add task to threads, there are thread safe prob , and lock threads.
* @param task
*/
void Thread::add(Task* task) {
task->base = this->base;
this->tasks_mutex.lock();
this->tasks.push(task);
this->tasks_mutex.unlock();
}
/**
* send pipe msg for notify thread that here are task to handle
*/
void Thread::Activate() {
size_t len = write(this->notify_fd, "c", 1);
if(len <= 0){
cerr << "send" << endl;
return;
}
cout << "notify success!" << endl;
}
Task.h接口
//
// Created by wenfan on 2021/5/5.
//
#ifndef LIBEVENT_FTP_SERVER_TASK_H
#define LIBEVENT_FTP_SERVER_TASK_H
#include <event2/event.h>
class Task {
public:
event_base* base{};
int accept_fd = 0;
int thread_id = 0;
virtual bool Init() = 0;
};
#endif //LIBEVENT_FTP_SERVER_TASK_H
FtpTask.h
//
// Created by wenfan on 2021/5/6.
//
#ifndef LIBEVENT_FTP_SERVER_FTPTASK_H
#define LIBEVENT_FTP_SERVER_FTPTASK_H
#include "Task.h"
#include <event2/bufferevent.h>
class FtpTask : public Task{
public:
virtual void read(bufferevent* bev){}
virtual void write(bufferevent* bev){}
virtual void event(bufferevent* bev, short what){}
protected:
void setCallback(bufferevent* bev);
static void read_cb(bufferevent* bev,void* args);
static void write_cb(bufferevent* bev,void* args);
static void event_cb(bufferevent* bev,short what, void* args);
};
#endif //LIBEVENT_FTP_SERVER_FTPTASK_H
FtpTask.cpp
//
// Created by wenfan on 2021/5/6.
//
#include "FtpTask.h"
void FtpTask::setCallback(bufferevent* bev){
bufferevent_setcb(bev, read_cb, write_cb, event_cb, this);
bufferevent_enable(bev, EV_WRITE | EV_READ);
}
void FtpTask::read_cb(bufferevent* bev,void* args){
auto* task = (FtpTask*)args;
task->read(bev);
}
void FtpTask::write_cb(bufferevent* bev,void* args){
auto* task = (FtpTask*)args;
task->write(bev);
}
void FtpTask::event_cb(bufferevent* bev,short what, void* args){
auto* task = (FtpTask*)args;
task->event(bev, what);
}
FtpServerCMD.h
//
// Created by wenfan on 2021/5/6.
//
#ifndef LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
#define LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
#include "FtpTask.h"
#include <event2/bufferevent.h>
class FtpServerCMD : public FtpTask{
public:
virtual bool Init();
virtual void read(bufferevent* bev);
virtual void write(bufferevent* bev);
virtual void event(bufferevent* bev,short what);
};
#endif //LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
FtpServerCMD.cpp
//
// Created by wenfan on 2021/5/6.
//
#include "FtpServerCMD.h"
#include <iostream>
using namespace std;
bool FtpServerCMD::Init() {
cout << "FtpServerCMD::Init" << endl;
bufferevent* bev = bufferevent_socket_new(this->base,
this->accept_fd,
BEV_OPT_CLOSE_ON_FREE);
this->setCallback(bev);
//添加超时
timeval rt = {60 ,0};
bufferevent_set_timeouts(bev, &rt, nullptr);
return true;
}
void FtpServerCMD::read(bufferevent *bev) {
char data[BUFSIZ] = {0};
while(true){
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if(len <=0 ){
bufferevent_free(bev);
delete this;
break;
}
data[len] = '\\0';
cout << "Recv CMD:" << data << endl;
//分发处理对象
}
bufferevent_write(bev,"ok\\n",3);
}
void FtpServerCMD::write(bufferevent *bev) {
}
void FtpServerCMD::event(bufferevent *bev, short what) {
if(what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_TIMEOUT)){
cout << "BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_TIMEOUT" << endl;
bufferevent_free(bev);
//delete this;
}
}
TaskFactory.h任务工厂类(单类模式)
//
// Created by wenfan on 2021/5/6.
//
#ifndef LIBEVENT_FTP_SERVER_TASKFACTORY_H
#define LIBEVENT_FTP_SERVER_TASKFACTORY_H
#include "Task.h"
class TaskFactory {
public:
static TaskFactory* getInstance();
Task* createTask();
};
#endif //LIBEVENT_FTP_SERVER_TASKFACTORY_H
TaskFactory.cpp
//
// Created by wenfan on 2021/5/6.
//
#include "TaskFactory.h"
#include "FtpServerCMD.h"
TaskFactory* TaskFactory::getInstance(){
static TaskFactory factory;
return &factory;
}
Task* TaskFactory::createTask(){
return new FtpServerCMD;
}
最后,最重要的CMakeLists.txt
cmake_minimum_required(VERSION 3.17)
project(libevent_ftp_server)
set(CMAKE_CXX_STANDARD 11)
# add extra include directories
include_directories(/usr/local/libevent/include)
include_directories(/usr/local/zlib/include)
# add extra lib directories
link_directories(/usr/local/libevent/lib)
link_directories(/usr/local/zlib/lib)
link_libraries(event)
link_libraries(z)
aux_source_directory(./self_pool self)
add_executable(self ${self})
target_link_libraries(self event)
以上是关于libevent 线程池的设计的主要内容,如果未能解决你的问题,请参考以下文章