线程应用: 线程池的实现

Posted 李憨憨_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程应用: 线程池的实现相关的知识,希望对你有一定的参考价值。

线程池的实现

线程池: 一堆线程等着处理任务- -非常典型的生产者与消费者模型应用
 一堆线程(有最大数量上线) + 线程安全的任务队列;
  其他线程将任务抛入线程安全的任务队列中, 线程池中的线程从任务队列中获取任务进行处理
应用场景: 针对大量的请求进行处理的场景(比如网络服务器......)
 常规情况下若要处理大量请求, 使用单执行流效率较低, 因此使用多执行流提高处理效率
 有一个请求, 则创建一个线程T1, 去处理请求T2, 处理完毕后线程销毁T3;
 若处理一个任务的总和时间中, 创建与销毁线程的时间占据大比例, 则意味着CPU资源大部分都消耗在线程的创建与销毁而不是处理任务;
优势: 线程池中的线程创建之后不销毁, 而是循环取出任务进行处理, 避免了频繁进行线程的创建与销毁带来的时间成本;  线程池中的线程以及缓冲区有最大数量限制, 避免了峰值压力下资源耗尽系统崩溃的风险;

实现:

 线程池的作用就是针对大量任务进行处理;
 任务类型多种多样, 但线程入口函数是固定的; 如何实现工作线程针对不同的任务进行不同的处理?
  1.线程入口函数中, 分辨任务类型, 调用不同的处理接口;
  2.其他线程在通过任务队列传入任务的同时, 也把这个任务的处理方法传进来, 线程池中的线程只需要使用处理方法处理任务即可, 不需要关注什么样的任务如何处理;

任务类:

typedef void(*handler_t)(int data);
class ThreadTask{//任务类
	int _data;//要处理的数据
	handler_t _handler;//处理数据的方法
	public:
		void Run(){return _handler(_data);}
};任务节点类;线程池中的线程获取到任务节点只需要调用Run

线程池:

class Threadpool{
	int _thr_num;//线程的数量
	BlockQueue _queue;//线程安全的任务队列
	void *thr_entry(void *arg){
		ThreadTask data;
		_queue.pop(&data);
		data.Run();
	}
	public:
		Init(int mq, int mt);初始化以及线程创建
		TaskPush(ThreadTask &task);任务入队
};

在这里插入图片描述

上代码

#include <iostream>
#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#define MAX_QUEUE  10
#define MAX_THREAD 5
using namespace std;

typedef void (*handler_t)(int data);
class ThreadTask{
    //任务类
    private:
        int _data;//要处理的数据
        handler_t _handler;//处理数据的方法
    public:
        ThreadTask(){}
        ThreadTask(int data, handler_t handler)
            :_data(data)
            ,_handler(handler){

            }
        void SetTask(int data, handler_t handler){
            _data = data;
            _handler = handler;
        }
        void Run(){
            return _handler(_data);
        }
};
class BlockQueue{
    private:
        int _capacity;//容量
        queue<ThreadTask> _queue;
        pthread_mutex_t _mutex;
        pthread_cond_t _cond_pro;
        pthread_cond_t _cond_cus;
    public:
        BlockQueue(int cap = MAX_QUEUE)
            :_capacity(cap)
        {
            pthread_mutex_init(&_mutex,NULL);
            pthread_cond_init(&_cond_pro,NULL);
            pthread_cond_init(&_cond_cus,NULL);
        }
        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_cond_pro);
            pthread_cond_destroy(&_cond_cus);
            
        }
        bool Push(ThreadTask &data)
        {
            pthread_mutex_lock(&_mutex);
            while(_queue.size() == _capacity){
                pthread_cond_wait(&_cond_pro, &_mutex);
            }
            _queue.push(data);
            pthread_cond_signal(&_cond_cus);
            pthread_mutex_unlock(&_mutex);
            return true;
        }
        bool Pop(ThreadTask *data)
        {
            pthread_mutex_lock(&_mutex);
            while(_queue.empty()){
                pthread_cond_wait(&_cond_cus, &_mutex);
            }
            *data = _queue.front();
            _queue.pop();
            pthread_cond_signal(&_cond_pro);
            pthread_mutex_unlock(&_mutex);
            return true;
        }
};

class ThreadPool
{
    public:
        ThreadPool(int tnum = MAX_THREAD, int qnum = MAX_THREAD)
        :_thread_num(tnum)
        ,_queue(qnum){}
        bool Init(){
            pthread_t tid;
            int ret;
            int i;
            for(i = 0; i < _thread_num; i++){
                ret = pthread_create(&tid,NULL, thr_entry, (void*)this);
                if(ret != 0){
                    cout << "thread creat error" << endl;
                    return false;
                }
                pthread_detach(tid);
            }
            return true;
        }
        bool Push(ThreadTask &task){
            _queue.Push(task);
            return true;
        }
    private:
        static void *thr_entry(void *arg){
            ThreadPool *p = (ThreadPool*)arg;
            while(1){
                ThreadTask task;
               p-> _queue.Pop(&task);
                task.Run();
            }
            return NULL;
        }
    private:
        int _thread_num;
        BlockQueue _queue;
};

void test(int data)
{
    printf("i am thr:%p-get data:%d-while sleep:%d\\n", pthread_self(), data, data % 5);
    sleep(data%5);
}
int main()
{
    ThreadPool pool;
    int i;
    for(i = 0; i < 10; i++){
        ThreadTask task(i, test);
        pool.Push(task);
    }
    while(1) sleep(1);
    return 0;
}

以上是关于线程应用: 线程池的实现的主要内容,如果未能解决你的问题,请参考以下文章

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

IDEA对新建java线程池的建议

线程池的使用场景和代码实现!

阶段1 语言基础+高级_1-3-Java语言高级_05-异常与多线程_第5节 线程池_2_线程池的代码实现

Java Web应用中调优线程池的重要性

Java Web应用中调优线程池的重要性