C 语言编程 — 线程池设计与实现
Posted 范桂飓
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C 语言编程 — 线程池设计与实现相关的知识,希望对你有一定的参考价值。
目录
文章目录
线程池(Thread Pool)
线程池(Thread Pool)是一种用于管理多线程环境的技术。Thread Pool 会在 User Process 中预先创建一组 User Threads,并在需要时重复使用它们,而不是频繁的创建新线程。
线程池可以有效提高程序性能和可靠性:
- 避免频繁创建、销毁线程,降低了系统开销;
- 限制线程数量,防止过度占用系统资源;
- 提高了程序的响应速度和吞吐量;
- 管理线程的生命周期,避免了线程泄漏的风险;
- 可实现任务优先级。
线程池的设计
一个线程池架构通常需要包含以下组件:
-
Task(任务):由 Task Producer(任务生产者)发出的 Task Request。
-
Task Queue(任务队列):用于缓冲 Tasks 的 FIFO 队列。
-
Thread Pool Manager(线程池管理器):用于管理 Thread Pool 资源:例如:多线程、互斥锁、条件变量等。
-
Worker(工作者):分配来真正处理 Task 的 Worker Thread(工作线程)。
当有 Task 需要处理时,Manager 就从 Pool 中获取一个可用的 Thread 来处理这个 Task。当 Task 完成后,Manager 回收 Thread,而不是销毁它。
C-Thread-Pool
- Github:https://github.com/Pithikos/C-Thread-Pool
C-Thread-Pool 是一个开源的轻量级线程池,实现了以下功能。
- 符合 ANCI C 和 POSIX 风格;
- 支持 Thread 的 Pause(暂停)、Resume(恢复)、Wait(等待);
- 提供简洁的 APIs。
运行示例
$ gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example
$ ./example
Making threadpool with 4 threads
THPOOL_DEBUG: Created thread 0 in pool
THPOOL_DEBUG: Created thread 1 in pool
THPOOL_DEBUG: Created thread 2 in pool
THPOOL_DEBUG: Created thread 3 in pool
Adding 40 tasks to threadpool
Thread #245600256 working on 0
Thread #246136832 working on 2
Thread #246673408 working on 3
Thread #246673408 working on 6
Thread #246673408 working on 7
...
Killing threadpool
数据结构
// 描述一个信号量
typedef struct bsem
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond; // 条件变量
int v; // 0、1 二进制信号量
bsem;
// 描述一个任务
typedef struct job
struct job* prev; // 指向下一个(previous)job
void (*function)(void* arg); // 线程入口函数
void* arg; // 线程入口函数的参数
job;
// 描述一个任务队列
typedef struct jobqueue
pthread_mutex_t rwmutex; // 读写 queue 的互斥锁
job *front; // 指向队头
job *rear; // 指向队尾
bsem *has_jobs; // 指向一个信号量
int len; // 队列长度
jobqueue;
// 描述一个线程
typedef struct thread
int id; // 线程池分配的 ID,不是 TID。
pthread_t pthread; // 指向一个线程
struct thpool_* thpool_p; // 指向线程池
thread;
// 描述一个线程池
typedef struct thpool_
thread** threads; // 指向线程指针数组
volatile int num_threads_alive; // 当前活跃的线程数量
volatile int num_threads_working; // 当前正在工作中的线程数量
pthread_mutex_t thcount_lock; // used for thread count etc
pthread_cond_t threads_all_idle; // 空闲条件:等待全部任务完成
jobqueue jobqueue; // 指向任务队列
thpool_;
Public APIs
typedef struct thpool_* threadpool;
// 创建一个包含有指定数量线程的线程池
threadpool thpool_init(int num_threads);
// 添加 task 到 task queue 中。
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
// 等待所有 tasks 执行完。
void thpool_wait(threadpool);
// 暂停所有 tasks,使它们进入 sleep 状态。通过信号机制来实现。
void thpool_pause(threadpool);
// 恢复所有 tasks 继续执行。
void thpool_resume(threadpool);
// 销毁所有 tasks,如果有 task 正在执行,则先等待 task 执行完。
void thpool_destroy(threadpool);
// 获取当前正在工作中的线程数量。
int thpool_num_threads_working(threadpool);
Private Functions
// 构造 struct thread,并调用 pthread_create() 创建线程
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id)
// 当线程被暂停时会在这里休眠
static void thread_hold(int sig_id)
// 线程在此函数中执行任务
static void* thread_do(struct thread* thread_p)
// 销毁 struct thread
static void thread_destroy (thread* thread_p)
// 任务队列相关的操作集合
static int jobqueue_init(jobqueue* jobqueue_p)
static void jobqueue_clear(jobqueue* jobqueue_p)
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob)
static struct job* jobqueue_pull(jobqueue* jobqueue_p)
static void jobqueue_destroy(jobqueue* jobqueue_p)
// 信号量相关的操作集合
static void bsem_init(bsem *bsem_p, int value)
static void bsem_reset(bsem *bsem_p)
static void bsem_post(bsem *bsem_p)
static void bsem_post_all(bsem *bsem_p)
static void bsem_wait(bsem* bsem_p)
测试
- memleaks.sh:测试是否发生内存泄露;
- threadpool.sh:测试线程池是否能正确地执行任务;
- pause_resume.sh:测试 pause 和 resume 是否正常;
- wait.sh:测试 wait 功能是否正常;
- heap_stack_garbage:测试堆栈内有垃圾数据时的情况;
以上是关于C 语言编程 — 线程池设计与实现的主要内容,如果未能解决你的问题,请参考以下文章