线程应用: 线程池的实现
Posted 李憨憨_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程应用: 线程池的实现相关的知识,希望对你有一定的参考价值。
线程池的实现
线程池: 一堆线程等着处理任务- -非常典型的生产者与消费者模型应用一堆线程(有最大数量上线) + 线程安全的任务队列;
其他线程将任务抛入线程安全的任务队列中, 线程池中的线程从任务队列中获取任务进行处理
应用场景: 针对大量的请求进行处理的场景(比如网络服务器......)
常规情况下若要处理大量请求, 使用单执行流效率较低, 因此使用多执行流提高处理效率
有一个请求, 则创建一个线程T1, 去处理请求T2, 处理完毕后线程销毁T3;
若处理一个任务的总和时间中, 创建与销毁线程的时间占据大比例, 则意味着CPU资源大部分都消耗在线程的创建与销毁而不是处理任务;
优势: 线程池中的线程创建之后不销毁, 而是循环取出任务进行处理, 避免了频繁进行线程的创建与销毁带来的时间成本; 线程池中的线程以及缓冲区有最大数量限制, 避免了峰值压力下资源耗尽系统崩溃的风险;
实现:
线程池的作用就是针对大量任务进行处理;任务类型多种多样, 但线程入口函数是固定的; 如何实现工作线程针对不同的任务进行不同的处理?
1.线程入口函数中, 分辨任务类型, 调用不同的处理接口;
2.其他线程在通过任务队列传入任务的同时, 也把这个任务的处理方法传进来, 线程池中的线程只需要使用处理方法处理任务即可, 不需要关注什么样的任务如何处理;
任务类:
typedef void(*handler_t)(int data);
class ThreadTask{//任务类
int _data;//要处理的数据
handler_t _handler;//处理数据的方法
public:
void Run(){return _handler(_data);}
};任务节点类;线程池中的线程获取到任务节点只需要调用Run
线程池:
class Threadpool{
int _thr_num;//线程的数量
BlockQueue _queue;//线程安全的任务队列
void *thr_entry(void *arg){
ThreadTask data;
_queue.pop(&data);
data.Run();
}
public:
Init(int mq, int mt);初始化以及线程创建
TaskPush(ThreadTask &task);任务入队
};
上代码
#include <iostream>
#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#define MAX_QUEUE 10
#define MAX_THREAD 5
using namespace std;
typedef void (*handler_t)(int data);
class ThreadTask{
//任务类
private:
int _data;//要处理的数据
handler_t _handler;//处理数据的方法
public:
ThreadTask(){}
ThreadTask(int data, handler_t handler)
:_data(data)
,_handler(handler){
}
void SetTask(int data, handler_t handler){
_data = data;
_handler = handler;
}
void Run(){
return _handler(_data);
}
};
class BlockQueue{
private:
int _capacity;//容量
queue<ThreadTask> _queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond_pro;
pthread_cond_t _cond_cus;
public:
BlockQueue(int cap = MAX_QUEUE)
:_capacity(cap)
{
pthread_mutex_init(&_mutex,NULL);
pthread_cond_init(&_cond_pro,NULL);
pthread_cond_init(&_cond_cus,NULL);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_cus);
}
bool Push(ThreadTask &data)
{
pthread_mutex_lock(&_mutex);
while(_queue.size() == _capacity){
pthread_cond_wait(&_cond_pro, &_mutex);
}
_queue.push(data);
pthread_cond_signal(&_cond_cus);
pthread_mutex_unlock(&_mutex);
return true;
}
bool Pop(ThreadTask *data)
{
pthread_mutex_lock(&_mutex);
while(_queue.empty()){
pthread_cond_wait(&_cond_cus, &_mutex);
}
*data = _queue.front();
_queue.pop();
pthread_cond_signal(&_cond_pro);
pthread_mutex_unlock(&_mutex);
return true;
}
};
class ThreadPool
{
public:
ThreadPool(int tnum = MAX_THREAD, int qnum = MAX_THREAD)
:_thread_num(tnum)
,_queue(qnum){}
bool Init(){
pthread_t tid;
int ret;
int i;
for(i = 0; i < _thread_num; i++){
ret = pthread_create(&tid,NULL, thr_entry, (void*)this);
if(ret != 0){
cout << "thread creat error" << endl;
return false;
}
pthread_detach(tid);
}
return true;
}
bool Push(ThreadTask &task){
_queue.Push(task);
return true;
}
private:
static void *thr_entry(void *arg){
ThreadPool *p = (ThreadPool*)arg;
while(1){
ThreadTask task;
p-> _queue.Pop(&task);
task.Run();
}
return NULL;
}
private:
int _thread_num;
BlockQueue _queue;
};
void test(int data)
{
printf("i am thr:%p-get data:%d-while sleep:%d\\n", pthread_self(), data, data % 5);
sleep(data%5);
}
int main()
{
ThreadPool pool;
int i;
for(i = 0; i < 10; i++){
ThreadTask task(i, test);
pool.Push(task);
}
while(1) sleep(1);
return 0;
}
以上是关于线程应用: 线程池的实现的主要内容,如果未能解决你的问题,请参考以下文章
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段