Unix/Linux 编程:网络编程之 线程池

Posted sesiria

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Unix/Linux 编程:网络编程之 线程池相关的知识,希望对你有一定的参考价值。

一,线程池的基本组成

1. 工作线程队列

2. 任务队列

3. 线程管理器(封装了线程池的一些基本方法,创建,销毁,加入新任务等)

 

比如一个常用的网络服务,通常主循环用于处理接收与基本的recv/send操作。可是如果某些业务操作需要长时间的处理数据。

比如等待数据库查询结果,对数据进行编解码等业务逻辑事。为了不让主线程进行长时间的阻塞,从而引入了线程池。让线程池的工作队列来处理具体的业务。

主线程则继续处理网络连接。

应用:线程池是生产者与消费者模型的一个最典型的应用。最常见的nginx服务中就使用了线程池。

 

 

二、线程池的基本工作流程

比如主循环是一个服务器,主循环首先处理各种网络IO,当需要实际处理业务数据的情况。将业务数据封装成“task(有些场合也叫消息)",并加入线程池的工作队列中就, 这个过程也叫做”生产“。

线程池中的工作线程会循环取出工作队列中的任务,并进行实际的业务处理,这个过程也叫”消费“。

1. task的定义

比如对于一个任务可以采用以下的方式来定义:

// define the task node
typedef void* (*callback)(void* arg);
struct node_task {
    callback func;   // call back which will be executed by the worker thread.
    void* user_data;

    struct node_task* prev; // link to the prv task
    struct node_task* next; // link to the next task
};

其中回调函数则是会被工作线程具体调用的函数。

user_data则是传递给回调函数处理的业务数据。

2. 线程队列的定义

// define the work node which will process the task.
struct node_worker {
    pthread_t tid;  // thread id.
    int terminate;  // flag to indicate whether the worker would be terminate.

    struct _threadPool* pool;
    struct node_worker* prev;
    struct node_worker* next;
};

线程队列就是一个双向链表数据结构,tid队应了相应的工作线程 

3. 线程池管理对象

// manager component
struct _threadPool {
    pthread_mutex_t mtx;
    pthread_cond_t  cond;

    struct node_task* tasks;
    struct node_worker* workers;
};

线程池主要包含任务队列和工作队列,由于对任务队列中进行读取是一个并发的过程,即任务队列是”临界资源",因此需要引入mutex互斥锁,以及条件变量。

4. 线程池的基本操作

线程池支持3种基本操作:

// initialize the thread pool object.
int nThreadPoolCreate(nThreadPool* pool, int numWorkers);

// destory the thread pool.
int nThreadPoolDestory(nThreadPool* pool);

// insert task into the pool task queue.
int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task);

分别是创建

销毁

以及插入任务

 

5. 线程池的工作原理

1)工作线程在任务队列为空的时候默认是处于休眠状态的。若退出标志位被设置为1,则表示工作线程要退出。

2)主线程往线程池添加任务以后会调用pthread_cond_signal ,这会唤醒一个工作线程,该线程同时会取得互斥锁,从任务队列中"取出"任务, 并释放锁。然后进行后续的任务处理。

3)销毁工作线程的时候,会释放各种资源,将工作线程退出标记设置为1。并通过pthread_cond_broadcast唤醒所有工作线程.此时工作线程将判断并退出。

4)拓展应用:根据任务队列的数量可以动态的创建或者销毁线程。

 

三、线程池的实现

 

/* File name: threadpool.c
 * Author: sesiria   2021-05-05
 * A Implementation of thread pool. including task queue, worker queue, management component
 **/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>

 // insert item into the list.
#define LL_ADD(item, list)  do {        \\
    item->prev = NULL;                  \\
    item->next = list;                  \\
    if(list != NULL) list->prev = item; \\
    list = item;                        \\
} while(0)

// remove item from list.
#define LL_REMOVE(item, list) do {                            \\
    if(item->prev != NULL) item->prev->next = item->next;   \\
    if(item->next != NULL) item->next->prev = item->prev;   \\
    if(list == item)  list = item->next;                    \\
    item->prev = item->next = NULL;                         \\
} while(0)

//===============start of declaration==============

// define the task node
typedef void* (*callback)(void* arg);
struct node_task {
    callback func;   // call back which will be executed by the worker thread.
    void* user_data;

    struct node_task* prev; // link to the prv task
    struct node_task* next; // link to the next task
};

// declaration at first.
struct _threadPool;

// define the work node which will process the task.
struct node_worker {
    pthread_t tid;  // thread id.
    int terminate;  // flag to indicate whether the worker would be terminate.

    struct _threadPool* pool;
    struct node_worker* prev;
    struct node_worker* next;
};

// manager component
struct _threadPool {
    pthread_mutex_t mtx;
    pthread_cond_t  cond;

    struct node_task* tasks;
    struct node_worker* workers;
};

typedef struct _threadPool nThreadPool;
//===============end of declaration==============

//=============function definition===============
// normal callback function for worker thread
void* thread_callback(void* arg) {
    if (arg == NULL) {
        perror("Illegal parameters, and exit worker thread\\n");
        pthread_exit(NULL);
    }
    struct node_worker* worker = (struct node_worker*)arg;
    nThreadPool* pool = worker->pool;
    if (pool == NULL) {
        perror("Illegal poll, and exit worker thread\\n");
        pthread_exit(NULL);
    }

    while (1) {
        // the task queue is a critical resource.
        pthread_mutex_lock(&pool->mtx);
        while (pool->tasks == NULL && !worker->terminate) {
            pthread_cond_wait(&pool->cond, &pool->mtx);
        }

        // the current worker has been tagged to be terminated.
        if (worker->terminate) {
            pthread_mutex_unlock(&pool->mtx);
            // wake up other threads.
            break;
        }
        // pick one task.
        struct node_task* task = pool->tasks;
        if (task != NULL) {
            LL_REMOVE(task, pool->tasks); // remove task from queue.
        }

        pthread_mutex_unlock(&pool->mtx);
        // execute the call back
        task->func(task);
        // should we free the current task?
        free(task);
    }
    free(worker);
    printf("thread ID: %lu exit!\\n", pthread_self());
    pthread_exit(NULL);
}

// initialize the thread pool object.
int nThreadPoolCreate(nThreadPool* pool, int numWorkers) {
    // check the paramters
    if (pool == NULL || numWorkers < 0)
        return -1; // illegal parameters.
    memset(pool, 0, sizeof(nThreadPool));

    // init mutex
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->mtx, &blank_mutex, sizeof(pthread_mutex_t));

    // init cond
    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));

    int i = 0;
    // create workers
    for (i = 0; i < numWorkers; i++) {
        struct node_worker* worker = (struct node_worker*)malloc(sizeof(struct node_worker));
        if (worker == NULL) {
            perror("malloc");
            return -1;
        }
        memset(worker, 0, sizeof(struct node_worker));
        worker->pool = pool;

        int ret = pthread_create(&worker->tid, NULL, thread_callback, worker);
        if (ret) {
            perror("pthread_create");
            free(worker);
            return -2;
        }

        // add the current worker into workers
        LL_ADD(worker, pool->workers);
    }
}

// destory the thread.
int nThreadPoolDestory(nThreadPool* pool) {
    // shut down all of the workers.
    struct node_worker* worker = NULL;
    for (worker = pool->workers; worker != NULL; worker = worker->next) {
        worker->terminate = 1; // set terminate.
    }

    pthread_mutex_lock(&pool->mtx);

    // clear the workers queue
    // because each struct worker will be released by each thread call back function.
    pool->workers = NULL;
    // clear the task queue
    struct node_task* task;
    for (task = pool->tasks; task != NULL; task = task->next) {
        free(task);
    }
    pool->tasks = NULL;

    // wake up all of the threads.
    pthread_cond_broadcast(&pool->cond);

    pthread_mutex_unlock(&pool->mtx);
}

// insert task into the pool task queue.
int nThreadPoolPushTask(nThreadPool* pool, struct node_task* task) {
    pthread_mutex_lock(&pool->mtx);

    LL_ADD(task, pool->tasks);
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mtx);
}

//==========end function definition============
#if 1       // debug

#define MAX_THREADS      10
#define COUNTER_SIZE    1000


void counter(struct node_task* task) {
    int index = *(int*)task->user_data;
    printf("index : %d, selfid : %lu\\n", index, pthread_self());
    free(task->user_data);
    // the task will be released by the worker thread.
}

int main(int argc, char* argv[]) {
    nThreadPool pool;

    nThreadPoolCreate(&pool, MAX_THREADS);

    int i = 0;
    for (i = 0; i < COUNTER_SIZE; i++) {
        struct node_task* task = (struct node_task*)malloc(sizeof(struct node_task));
        if (task == NULL) {
            perror("malloc");
            exit(EXIT_FAILURE);
        }

        task->func = (callback)counter;
        task->user_data = malloc(sizeof(int));
        *(int*)task->user_data = i;
        nThreadPoolPushTask(&pool, task);
    }

    printf("Press any key to destory the thread pool\\n");
    nThreadPoolDestory(&pool);
    getchar();
    exit(EXIT_SUCCESS);
}

#endif 


测试代码运行结果:

所有工作线程都能正确退出。

以上是关于Unix/Linux 编程:网络编程之 线程池的主要内容,如果未能解决你的问题,请参考以下文章

Unix/Linux 编程:网络编程之 IO模型

并发编程补充知识之标准线程池

转:Java并发编程之十九:并发新特性—Executor框架与线程池(含代码)

Liunx C 编程之多线程与Socket

Python并发编程之线程池/进程池--concurrent.futures模块

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(关闭线程池: shutdownshutdownNow)