Unix/Linux 编程:网络编程之 线程池
Posted sesiria
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Unix/Linux 编程:网络编程之 线程池相关的知识,希望对你有一定的参考价值。
一,线程池的基本组成
1. 工作线程队列
2. 任务队列
3. 线程管理器(封装了线程池的一些基本方法,创建,销毁,加入新任务等)
比如一个常用的网络服务,通常主循环用于处理接收与基本的recv/send操作。可是如果某些业务操作需要长时间的处理数据。
比如等待数据库查询结果,对数据进行编解码等业务逻辑事。为了不让主线程进行长时间的阻塞,从而引入了线程池。让线程池的工作队列来处理具体的业务。
主线程则继续处理网络连接。
应用:线程池是生产者与消费者模型的一个最典型的应用。最常见的nginx服务中就使用了线程池。
二、线程池的基本工作流程
比如主循环是一个服务器,主循环首先处理各种网络IO,当需要实际处理业务数据的情况。将业务数据封装成“task(有些场合也叫消息)",并加入线程池的工作队列中就, 这个过程也叫做”生产“。
线程池中的工作线程会循环取出工作队列中的任务,并进行实际的业务处理,这个过程也叫”消费“。
1. task的定义
比如对于一个任务可以采用以下的方式来定义:
// define the task node
typedef void* (*callback)(void* arg);
struct node_task {
callback func; // call back which will be executed by the worker thread.
void* user_data;
struct node_task* prev; // link to the prv task
struct node_task* next; // link to the next task
};
其中回调函数则是会被工作线程具体调用的函数。
user_data则是传递给回调函数处理的业务数据。
2. 线程队列的定义
// define the work node which will process the task.
struct node_worker {
pthread_t tid; // thread id.
int terminate; // flag to indicate whether the worker would be terminate.
struct _threadPool* pool;
struct node_worker* prev;
struct node_worker* next;
};
线程队列就是一个双向链表数据结构,tid队应了相应的工作线程
3. 线程池管理对象
// manager component
struct _threadPool {
pthread_mutex_t mtx;
pthread_cond_t cond;
struct node_task* tasks;
struct node_worker* workers;
};
线程池主要包含任务队列和工作队列,由于对任务队列中进行读取是一个并发的过程,即任务队列是”临界资源",因此需要引入mutex互斥锁,以及条件变量。
4. 线程池的基本操作
线程池支持3种基本操作:
// initialize the thread pool object.
int nThreadPoolCreate(nThreadPool* pool, int numWorkers);
// destory the thread pool.
int nThreadPoolDestory(nThreadPool* pool);
// insert task into the pool task queue.
int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task);
分别是创建
销毁
以及插入任务
5. 线程池的工作原理
1)工作线程在任务队列为空的时候默认是处于休眠状态的。若退出标志位被设置为1,则表示工作线程要退出。
2)主线程往线程池添加任务以后会调用pthread_cond_signal ,这会唤醒一个工作线程,该线程同时会取得互斥锁,从任务队列中"取出"任务, 并释放锁。然后进行后续的任务处理。
3)销毁工作线程的时候,会释放各种资源,将工作线程退出标记设置为1。并通过pthread_cond_broadcast唤醒所有工作线程.此时工作线程将判断并退出。
4)拓展应用:根据任务队列的数量可以动态的创建或者销毁线程。
三、线程池的实现
/* File name: threadpool.c
* Author: sesiria 2021-05-05
* A Implementation of thread pool. including task queue, worker queue, management component
**/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
// insert item into the list.
#define LL_ADD(item, list) do { \\
item->prev = NULL; \\
item->next = list; \\
if(list != NULL) list->prev = item; \\
list = item; \\
} while(0)
// remove item from list.
#define LL_REMOVE(item, list) do { \\
if(item->prev != NULL) item->prev->next = item->next; \\
if(item->next != NULL) item->next->prev = item->prev; \\
if(list == item) list = item->next; \\
item->prev = item->next = NULL; \\
} while(0)
//===============start of declaration==============
// define the task node
typedef void* (*callback)(void* arg);
struct node_task {
callback func; // call back which will be executed by the worker thread.
void* user_data;
struct node_task* prev; // link to the prv task
struct node_task* next; // link to the next task
};
// declaration at first.
struct _threadPool;
// define the work node which will process the task.
struct node_worker {
pthread_t tid; // thread id.
int terminate; // flag to indicate whether the worker would be terminate.
struct _threadPool* pool;
struct node_worker* prev;
struct node_worker* next;
};
// manager component
struct _threadPool {
pthread_mutex_t mtx;
pthread_cond_t cond;
struct node_task* tasks;
struct node_worker* workers;
};
typedef struct _threadPool nThreadPool;
//===============end of declaration==============
//=============function definition===============
// normal callback function for worker thread
void* thread_callback(void* arg) {
if (arg == NULL) {
perror("Illegal parameters, and exit worker thread\\n");
pthread_exit(NULL);
}
struct node_worker* worker = (struct node_worker*)arg;
nThreadPool* pool = worker->pool;
if (pool == NULL) {
perror("Illegal poll, and exit worker thread\\n");
pthread_exit(NULL);
}
while (1) {
// the task queue is a critical resource.
pthread_mutex_lock(&pool->mtx);
while (pool->tasks == NULL && !worker->terminate) {
pthread_cond_wait(&pool->cond, &pool->mtx);
}
// the current worker has been tagged to be terminated.
if (worker->terminate) {
pthread_mutex_unlock(&pool->mtx);
// wake up other threads.
break;
}
// pick one task.
struct node_task* task = pool->tasks;
if (task != NULL) {
LL_REMOVE(task, pool->tasks); // remove task from queue.
}
pthread_mutex_unlock(&pool->mtx);
// execute the call back
task->func(task);
// should we free the current task?
free(task);
}
free(worker);
printf("thread ID: %lu exit!\\n", pthread_self());
pthread_exit(NULL);
}
// initialize the thread pool object.
int nThreadPoolCreate(nThreadPool* pool, int numWorkers) {
// check the paramters
if (pool == NULL || numWorkers < 0)
return -1; // illegal parameters.
memset(pool, 0, sizeof(nThreadPool));
// init mutex
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mtx, &blank_mutex, sizeof(pthread_mutex_t));
// init cond
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
int i = 0;
// create workers
for (i = 0; i < numWorkers; i++) {
struct node_worker* worker = (struct node_worker*)malloc(sizeof(struct node_worker));
if (worker == NULL) {
perror("malloc");
return -1;
}
memset(worker, 0, sizeof(struct node_worker));
worker->pool = pool;
int ret = pthread_create(&worker->tid, NULL, thread_callback, worker);
if (ret) {
perror("pthread_create");
free(worker);
return -2;
}
// add the current worker into workers
LL_ADD(worker, pool->workers);
}
}
// destory the thread.
int nThreadPoolDestory(nThreadPool* pool) {
// shut down all of the workers.
struct node_worker* worker = NULL;
for (worker = pool->workers; worker != NULL; worker = worker->next) {
worker->terminate = 1; // set terminate.
}
pthread_mutex_lock(&pool->mtx);
// clear the workers queue
// because each struct worker will be released by each thread call back function.
pool->workers = NULL;
// clear the task queue
struct node_task* task;
for (task = pool->tasks; task != NULL; task = task->next) {
free(task);
}
pool->tasks = NULL;
// wake up all of the threads.
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mtx);
}
// insert task into the pool task queue.
int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task) {
pthread_mutex_lock(&pool->mtx);
LL_ADD(task, pool->tasks);
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mtx);
}
//==========end function definition============
#if 1 // debug
#define MAX_THREADS 10
#define COUNTER_SIZE 1000
void counter(struct node_task* task) {
int index = *(int*)task->user_data;
printf("index : %d, selfid : %lu\\n", index, pthread_self());
free(task->user_data);
// the task will be released by the worker thread.
}
int main(int argc, char* argv[]) {
nThreadPool pool;
nThreadPoolCreate(&pool, MAX_THREADS);
int i = 0;
for (i = 0; i < COUNTER_SIZE; i++) {
struct node_task* task = (struct node_task*)malloc(sizeof(struct node_task));
if (task == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
task->func = (callback)counter;
task->user_data = malloc(sizeof(int));
*(int*)task->user_data = i;
nThreadPoolPushTask(&pool, task);
}
printf("Press any key to destory the thread pool\\n");
nThreadPoolDestory(&pool);
getchar();
exit(EXIT_SUCCESS);
}
#endif
测试代码运行结果:
所有工作线程都能正确退出。
以上是关于Unix/Linux 编程:网络编程之 线程池的主要内容,如果未能解决你的问题,请参考以下文章
转:Java并发编程之十九:并发新特性—Executor框架与线程池(含代码)
Python并发编程之线程池/进程池--concurrent.futures模块
JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(关闭线程池: shutdownshutdownNow)