C++线程池
Posted 顾文繁
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++线程池相关的知识,希望对你有一定的参考价值。
使用libevent封装的c++内嵌线程池,在主线程负责调度,在开启的线程中负责处理接受到的任务。在主线程使用管道发送信号给开启的线程,在开启的线程中使用libevent监听主线程发送来的信号,接收成功以后,从线程中维护的任务队列取出,开始处理任务。下面是线程池的UML类图
主函数:
//
// Created by wenfan on 2021/5/4.
//
#include <iostream>
#include <event2/event.h>
#include <event2/listener.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "Task.h"
#include "TaskFactory.h"
#include "ThreadPool.h"
#define PORT 8877
char* getIp(sockaddr_in* remoteAddr){
return inet_ntoa(remoteAddr->sin_addr);
}
int getPort(sockaddr_in* remoteAddr){
return ntohs(remoteAddr->sin_port);
}
using namespace std;
void listen_cb(struct evconnlistener *listener,
evutil_socket_t accept_sockfd,
struct sockaddr * remoteAddr,
int socklen, void* arg){
cout << "listen_cb" << endl;
Task* task = TaskFactory::getInstance()->createTask();
task->accept_fd = accept_sockfd;
ThreadPool::getInstance()->Dispatch(task);
}
int main(int argc, char* argv[]){
event_base* base = event_base_new();
sockaddr_in sin = {0};
memset(&sin, 0, sizeof sin);
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
evconnlistener* listener = evconnlistener_new_bind(base,
listen_cb,
base,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
10,
(sockaddr*) & sin,
sizeof(sin));
ThreadPool::getInstance()->Init(10);
event_base_dispatch(base);
evconnlistener_free(listener);
event_base_free(base);
return 0;
}
ThreadPool.h
//
// Created by wenfan on 2021/5/4.
//
#ifndef LIBEVENT_FTP_SERVER_THREADPOOL_H
#define LIBEVENT_FTP_SERVER_THREADPOOL_H
#include <vector>
#include "Thread.h"
#include "Task.h"
class ThreadPool {
public:
void Init(int thread_cnt);
void Dispatch(Task* task);
static ThreadPool* getInstance();
private:
std::vector<Thread*> threads;
int lastThread = -1;
int threadCnt = 10;
ThreadPool()= default;
};
#endif //LIBEVENT_FTP_SERVER_THREADPOOL_H
ThreadPool.cpp
//
// Created by wenfan on 2021/5/4.
//
#include "ThreadPool.h"
#include <iostream>
#include <thread>
using namespace std;
ThreadPool* ThreadPool::getInstance(){
static ThreadPool pool;
return &pool;
}
void ThreadPool::Init(int thread_cnt){
if(thread_cnt <= 0){
cerr << "thread count must be greater than 0!" << endl;
return;
}
cout << this << endl;
this->threadCnt = thread_cnt;
for (int i = 0; i < thread_cnt; ++i) {
auto* t = new Thread;
threads.push_back(t);
t->id = i + 1;
t->Start();
cout << "thread : " << t->id << " has started!" << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}
}
void ThreadPool::Dispatch(Task *task) {
if(!task)
return;
int nextThread = (this->lastThread + 1) % this->threadCnt;
this->lastThread = nextThread;
auto* t = this->threads[nextThread];
t->add(task);
//thread pool -> thread
t->Activate();
}
Thread.h
//
// Created by wenfan on 2021/5/5.
//
#ifndef LIBEVENT_FTP_SERVER_THREAD_H
#define LIBEVENT_FTP_SERVER_THREAD_H
#include <event2/event.h>
#include <queue>
#include "Task.h"
#include <mutex>
class Thread {
public:
bool Init();
void Main();
void Start();
static void read_cb(evutil_socket_t, short, void *);
void Notify(evutil_socket_t fd);
void add(Task*);
int id;
void Activate();
private:
int notify_fd;
event_base* base;
//任务队列
std::queue<Task*> tasks;
std::mutex tasks_mutex;
};
#endif //LIBEVENT_FTP_SERVER_THREAD_H
Thread.cpp
//
// Created by wenfan on 2021/5/5.
//
#include "Thread.h"
#include <unistd.h>
#include <iostream>
#include <thread>
using namespace std;
/**
* thread base's read_cb
* @param fd
* @param which
* @param arg
*/
void Thread::read_cb(evutil_socket_t fd, short which, void* arg) {
auto* t = (Thread*) arg;
t->Notify(fd);
}
/**
* thread init base
* @return
*/
bool Thread::Init(){
int fd[2];
if(pipe(fd)){
cerr << "create fd failed!" << endl;
return false;
}
this->notify_fd = fd[1];
// create libevent context(unlock)
event_config* config = event_config_new();
event_config_set_flag(config, EVENT_BASE_FLAG_NOLOCK);
this->base = event_base_new_with_config(config);
event_config_free(config);
if(this->base == nullptr){
cerr << "create event_base_new_with_config failed!" << endl;
return false;
}
// set read callback to thread base
event* ev = event_new(Thread::base, fd[0], EV_READ | EV_PERSIST, read_cb, this);
event_add(ev, nullptr);
return true;
}
/**
* recv pipe fd msg
* @param fd
*/
void Thread::Notify(evutil_socket_t fd) {
char buf[2] = {0};
size_t len = read(fd, buf, 1);
if(len <= 0){
cerr << "read pipe fd msg failed!" << endl;
return ;
}
cout << "recv pipe :" << buf << endl;
//fetch task from thread's tasks
Task* task = nullptr;
this->tasks_mutex.lock();
if(this->tasks.empty()){
this->tasks_mutex.unlock();
return;
}
task = this->tasks.front();
this->tasks.pop();
this->tasks_mutex.unlock();
//handle task
task->Init();
}
/**
* real thread run this loop
*/
void Thread::Main(){
if(event_base_dispatch(this->base)){
cerr << "thread base loop start failed!" << endl;
}
event_base_free(this->base);
}
/**
* init and start thread
*/
void Thread::Start(){
this->Init();
thread th(&Thread::Main, this);
th.detach();
}
/**
* add task to threads, there are thread safe prob , and lock threads.
* @param task
*/
void Thread::add(Task* task) {
task->base = this->base;
this->tasks_mutex.lock();
this->tasks.push(task);
this->tasks_mutex.unlock();
}
/**
* send pipe msg for notify thread that here are task to handle
*/
void Thread::Activate() {
size_t len = write(this->notify_fd, "c", 1);
if(len <= 0){
cerr << "send" << endl;
return;
}
cout << "notify success!" << endl;
}
Task.h接口
//
// Created by wenfan on 2021/5/5.
//
#ifndef LIBEVENT_FTP_SERVER_TASK_H
#define LIBEVENT_FTP_SERVER_TASK_H
#include <event2/event.h>
class Task {
public:
event_base* base{};
int accept_fd = 0;
int thread_id = 0;
virtual bool Init() = 0;
};
#endif //LIBEVENT_FTP_SERVER_TASK_H
FtpTask.h
//
// Created by wenfan on 2021/5/6.
//
#ifndef LIBEVENT_FTP_SERVER_FTPTASK_H
#define LIBEVENT_FTP_SERVER_FTPTASK_H
#include "Task.h"
#include <event2/bufferevent.h>
class FtpTask : public Task{
public:
virtual void read(bufferevent* bev){}
virtual void write(bufferevent* bev){}
virtual void event(bufferevent* bev, short what){}
protected:
void setCallback(bufferevent* bev);
static void read_cb(bufferevent* bev,void* args);
static void write_cb(bufferevent* bev,void* args);
static void event_cb(bufferevent* bev,short what, void* args);
};
#endif //LIBEVENT_FTP_SERVER_FTPTASK_H
FtpTask.cpp
//
// Created by wenfan on 2021/5/6.
//
#include "FtpTask.h"
void FtpTask::setCallback(bufferevent* bev){
bufferevent_setcb(bev, read_cb, write_cb, event_cb, this);
bufferevent_enable(bev, EV_WRITE | EV_READ);
}
void FtpTask::read_cb(bufferevent* bev,void* args){
auto* task = (FtpTask*)args;
task->read(bev);
}
void FtpTask::write_cb(bufferevent* bev,void* args){
auto* task = (FtpTask*)args;
task->write(bev);
}
void FtpTask::event_cb(bufferevent* bev,short what, void* args){
auto* task = (FtpTask*)args;
task->event(bev, what);
}
FtpServerCMD.h
//
// Created by wenfan on 2021/5/6.
//
#ifndef LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
#define LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
#include "FtpTask.h"
#include <event2/bufferevent.h>
class FtpServerCMD : public FtpTask{
public:
virtual bool Init();
virtual void read(bufferevent* bev);
virtual void write(bufferevent* bev);
virtual void event(bufferevent* bev,short what);
};
#endif //LIBEVENT_FTP_SERVER_FTPSERVERCMD_H
FtpServerCMD.cpp
//
// Created by wenfan on 2021/5/6.
//
#include "FtpServerCMD.h"
#include <iostream>
using namespace std;
bool FtpServerCMD::Init() {
cout << "FtpServerCMD::Init" << endl;
bufferevent* bev = bufferevent_socket_new(this->base,
this->accept_fd,
BEV_OPT_CLOSE_ON_FREE);
this->setCallback(bev);
//添加超时
timeval rt = {60 ,0};
bufferevent_set_timeouts(bev, &rt, nullptr);
return true;
}
void FtpServerCMD::read(bufferevent *bev) {
char data[BUFSIZ] = {0};
while(true){
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if(len <=0 ){
bufferevent_free(bev);
delete this;
break;
}
data[len] = '\\0';
cout << "Recv CMD:" << data << endl;
//分发处理对象
}
bufferevent_write(bev,"ok\\n",3);
}
void FtpServerCMD::write(bufferevent *bev) {
}
void FtpServerCMD::event(bufferevent *bev, short what) {
if(what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_TIMEOUT)){
cout << "BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_TIMEOUT" << endl;
bufferevent_free(bev);
//delete this;
}
}
TaskFactory.h任务工厂类(单类模式)
//
// Created by wenfan on 2021/5/6.
//
#ifndef LIBEVENT_FTP_SERVER_TASKFACTORY_H
#define LIBEVENT_FTP_SERVER_TASKFACTORY_H
#include "Task.h"
class TaskFactory {
public:
static TaskFactory* getInstance();
Task* createTask();
};
#endif //LIBEVENT_FTP_SERVER_TASKFACTORY_H
TaskFactory.cpp
//
// Created by wenfan on 2021/5/6.
//
#include "TaskFactory.h"
#include "FtpServerCMD.h"
TaskFactory* TaskFactory::getInstance(){
static TaskFactory factory;
return &factory;
}
Task* TaskFactory::createTask(){
return new FtpServerCMD;
}
最后,最重要的CMakeLists.txt
cmake_minimum_required(VERSION 3.17)
project(libevent_ftp_server)
set(CMAKE_CXX_STANDARD 11)
# add extra include directories
include_directories(/usr/local/libevent/include)
include_directories(/usr/local/zlib/include)
# add extra lib directories
link_directories(/usr/local/libevent/lib)
link_directories(/usr/local/zlib/lib)
link_libraries(event)
link_libraries(z)
aux_source_directory(./self_pool self)
add_executable(self ${self})
target_link_libraries(self event)
以上是关于C++线程池的主要内容,如果未能解决你的问题,请参考以下文章