Linux线程池在server上简单应用

Posted jhcelue

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux线程池在server上简单应用相关的知识,希望对你有一定的参考价值。

一、问题描写叙述

如今以C/S架构为例。client向server端发送要查找的数字,server端启动线程中的线程进行对应的查询。将查询结果显示出来。

二、实现方案

1. 整个project以client、server、lib组织。例如以下图所看到的:

技术分享

2. 进入lib。

技术分享

socket.h、socket.c

/**
  @file		socket.h
  @brief	Socket API header file

  TCP socket utility functions, it provides simple functions that helps
  to build TCP client/server.

  @author wangzhicheng
 */
#ifndef SOCKET_H
#define SOCKET_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <resolv.h>
#include <fcntl.h>

#define MAX_CONNECTION				20

int	TCPServerInit(int port, int *serverfd);
int	TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr);
int TCPServerSelect(int* serverfdlist, int num, int *clientfd, char *clientaddr);
int	TCPClientInit(int *clientfd);
int	TCPClientConnect(const int clientfd, const char *addr, int port);
int	TCPNonBlockRead(int clientfd, char* buf, int size);
int TCPBlockRead(int clientfd, char* buf, int size);
int	TCPWrite(int clientfd, char* buf, int size);
void TCPClientClose(int sockfd);
void TCPServerClose(int sockfd);

#endif


socket.c

#include "socket.h"
/*
 * @brief	initialize TCP server
 * @port		port number for socket
 * @serverfd	server socket fd
 * return server socked fd for success, on error return error code
 * */
int	TCPServerInit(int port, int *serverfd) {
	struct sockaddr_in dest;
	// create socket , same as client
	*serverfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
	if(*serverfd < 0) return -1;
	/// initialize structure dest
	memset((void*)&dest, '\0', sizeof(dest));
	dest.sin_family = PF_INET;
	dest.sin_port = htons(port);
	dest.sin_addr.s_addr = INADDR_ANY;
	// Assign a port number to socket
	bind( *serverfd, (struct sockaddr*)&dest, sizeof(dest));

	return *serverfd;
}
/*
 * @brief	wait client connect
 * @serverfd	server socket fd
 * @clientfd	client socket fd
 * @clientaddr	client address which connect to server
 * return client fd, on error return error code
 * */
int	TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr) {
	struct sockaddr_in client_addr;
	socklen_t addrlen = sizeof(client_addr);
	// make it listen to socket
	listen( serverfd, 20);
	// Wait and Accept connection
	*clientfd = accept(serverfd, (struct sockaddr*)&client_addr, &addrlen);
	strcpy( clientaddr, (const char *)( inet_ntoa( client_addr.sin_addr)));

	return *clientfd;
}
/*
 * @brief	initialize TCP client
 * @clientfd	client socket fd
 * return client socked fd for success, on error return error code
 */
int	TCPClientInit(int *clientfd) {
	*clientfd = socket(PF_INET, SOCK_STREAM, 0);

	return *clientfd;
}
/*
 * @brief	connect to TCP server
 * @clientfd	client socket fd
 * @addr		server address
 * @port		server port number
 * return 0 for success, on error -1 is returned
 */
int	TCPClientConnect(const int clientfd, const char *addr, int port) {
	struct sockaddr_in dest;
	// initialize value in dest
	memset(&dest, '\0', sizeof(dest));
	dest.sin_family = PF_INET;
	dest.sin_port = htons(port);
	inet_aton(addr, &dest.sin_addr);

	// Connecting to server
	return connect(clientfd, (struct sockaddr*)&dest, sizeof(dest));
}
/*
 * @brief	non-block read from TCP socket
 * @clientfd	socket fd
 * @buf	     	input buffer
 * @size		buffer size
 * return	    the length of read data
 */
int	TCPNonBlockRead(int clientfd, char* buf, int size) {
	int opts;
	opts = fcntl(clientfd, F_GETFL);
	opts = (opts | O_NONBLOCK);
	fcntl(clientfd, F_SETFL, opts);

	return recv(clientfd, buf, size, 0);
}
/*
 * @brief	block read from TCP socket
 * @clientfd	socket fd
 * @buf	  	    input buffer
 * @size		buf size
 * return	    the length of read data
 */
int	TCPBlockRead(int clientfd, char* buf, int size) {
	int opts;
	opts = fcntl(clientfd, F_GETFL);
	opts = (opts & ~O_NONBLOCK);
	fcntl(clientfd, F_SETFL, opts);

	return recv(clientfd, buf, size, 0);
}
/*
 * @brief	write to TCP socket
 * @clientfd	socket fd
 * @buf		    output buf
 * @size		output buf length
 * return	    the length of the actual written data, -1: disconnected
 */
int	TCPWrite(int clientfd, char* buf, int size) {
	int len= 0;
	/* set socket to nonblock */
	int ret = fcntl(clientfd, F_GETFL);
	ret |= O_NONBLOCK;
	if (fcntl(clientfd, F_SETFL, ret) < 0 ) {
		printf("set socket to nonblock fail [%d] !\n", errno);
	}
	len = send(clientfd, buf, size, MSG_NOSIGNAL);

	return len;
}
/*
 * @brief	close the tcp connection
 * @sockfd	socket fd
 * return	none
 */
void TCPConnectionClose(int sockfd) {
	close(sockfd);
}


threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
struct job
{
    void* (*callback_function)(void *arg);    //线程回调函数
    void *arg;                                //回调函数參数
    struct job *next;
};

struct threadpool
{
    int thread_num;                   //线程池中开启线程的个数
    int queue_max_num;                //队列中最大job的个数
    struct job *head;                 //指向job的头指针
    struct job *tail;                 //指向job的尾指针
    pthread_t *pthreads;              //线程池中全部线程的pthread_t
    pthread_mutex_t mutex;            //相互排斥信号量
    pthread_cond_t queue_empty;       //队列为空的条件变量
    pthread_cond_t queue_not_empty;   //队列不为空的条件变量
    pthread_cond_t queue_not_full;    //队列不为满的条件变量
    int queue_cur_num;                //队列当前的job个数
    int queue_close;                  //队列是否已经关闭
    int pool_close;                   //线程池是否已经关闭
};

//================================================================================================
//函数名:                   threadpool_init
//函数描写叙述:                 初始化线程池
//输入:                    [in] thread_num     线程池开启的线程个数
//                         [in] queue_max_num  队列的最大job个数 
//输出:                    无
//返回:                    成功:线程池地址 失败:NULL
//================================================================================================
struct threadpool* threadpool_init(int thread_num, int queue_max_num);

//================================================================================================
//函数名:                    threadpool_add_job
//函数描写叙述:                  向线程池中加入任务
//输入:                     [in] pool                  线程池地址
//                          [in] callback_function     回调函数
//                          [in] arg                     回调函数參数
//输出:                     无
//返回:                     成功:0 失败:-1
//================================================================================================
int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);

//================================================================================================
//函数名:                    threadpool_destroy
//函数描写叙述:                   销毁线程池
//输入:                      [in] pool                  线程池地址
//输出:                      无
//返回:                      成功:0 失败:-1
//================================================================================================
int threadpool_destroy(struct threadpool *pool);

//================================================================================================
//函数名:                    threadpool_function
//函数描写叙述:                  线程池中线程函数
//输入:                     [in] arg                  线程池地址
//输出:                     无  
//返回:                     无
//================================================================================================
void* threadpool_function(void* arg);
#endif

threadpool.c

#include "threadpool.h"

struct threadpool* threadpool_init(int thread_num, int queue_max_num) {
    struct threadpool *pool = NULL;
    do 
    {
        pool = malloc(sizeof(struct threadpool));
        if (NULL == pool)
        {
            printf("failed to malloc threadpool!\n");
            break;
        }
        pool->thread_num = thread_num;
        pool->queue_max_num = queue_max_num;
        pool->queue_cur_num = 0;
        pool->head = NULL;
        pool->tail = NULL;
        if (pthread_mutex_init(&(pool->mutex), NULL))
        {
            printf("failed to init mutex!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_empty), NULL))
        {
            printf("failed to init queue_empty!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_empty), NULL))
        {
            printf("failed to init queue_not_empty!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_full), NULL))
        {
            printf("failed to init queue_not_full!\n");
            break;
        }
        pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
        if (NULL == pool->pthreads)
        {
            printf("failed to malloc pthreads!\n");
            break;
        }
        pool->queue_close = 0;
        pool->pool_close = 0;
        int i;
        for (i = 0; i < pool->thread_num; ++i)
        {
            pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool);
        }
        
        return pool;    
    } while (0);
    
    return NULL;
}
int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg) {
	if(pool == NULL || callback_function == NULL || arg == NULL) return -1;

    pthread_mutex_lock(&(pool->mutex));
    while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
    {
        pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex));   //队列满的时候就等待
    }
    if (pool->queue_close || pool->pool_close)    //队列关闭或者线程池关闭就退出
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    struct job *pjob =(struct job*) malloc(sizeof(struct job));
    if (NULL == pjob)
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    } 
    pjob->callback_function = callback_function;    
    pjob->arg = arg;
    pjob->next = NULL;
    if (pool->head == NULL)   
    {
        pool->head = pool->tail = pjob;
        pthread_cond_broadcast(&(pool->queue_not_empty));  //队列空的时候,有任务来时就通知线程池中的线程:队列非空
    }
    else
    {
        pool->tail->next = pjob;
        pool->tail = pjob;    
    }
    pool->queue_cur_num++;
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

void* threadpool_function(void* arg) {
    struct threadpool *pool = (struct threadpool*)arg;
    struct job *pjob = NULL;
    while (1)  //死循环
    {
        pthread_mutex_lock(&(pool->mutex));
        while ((pool->queue_cur_num == 0) && !pool->pool_close)   //队列为空时,就等待队列非空
        {
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
        }
        if (pool->pool_close)   //线程池关闭,线程就退出
        {
            pthread_mutex_unlock(&(pool->mutex));
            pthread_exit(NULL);
        }
        pool->queue_cur_num--;
        pjob = pool->head;
        if (pool->queue_cur_num == 0)
        {
            pool->head = pool->tail = NULL;
        }
        else 
        {
            pool->head = pjob->next;
        }
        if (pool->queue_cur_num == 0)
        {
            pthread_cond_signal(&(pool->queue_empty));        //队列为空,就能够通知threadpool_destroy函数,销毁线程函数
        }
        if (pool->queue_cur_num == pool->queue_max_num - 1)
        {
            pthread_cond_broadcast(&(pool->queue_not_full));  //队列非满。就能够通知threadpool_add_job函数,加入新任务
        }
        pthread_mutex_unlock(&(pool->mutex));
        
        (*(pjob->callback_function))(pjob->arg);   //线程真正要做的工作,回调函数的调用
        free(pjob);
        pjob = NULL;    
    }
}
int threadpool_destroy(struct threadpool *pool) {
	if(pool == NULL) return -1;
    pthread_mutex_lock(&(pool->mutex));
    if (pool->queue_close || pool->pool_close)   //线程池已经退出了,就直接返回
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    
    pool->queue_close = 1;        //置队列关闭标志
    while (pool->queue_cur_num != 0)
    {
        pthread_cond_wait(&(pool->queue_empty), &(pool->mutex));  //等待队列为空
    }    
    
    pool->pool_close = 1;      //置线程池关闭标志
    pthread_mutex_unlock(&(pool->mutex));
    pthread_cond_broadcast(&(pool->queue_not_empty));  //唤醒线程池中正在堵塞的线程
    pthread_cond_broadcast(&(pool->queue_not_full));   //唤醒加入任务的threadpool_add_job函数
    int i;
    for (i = 0; i < pool->thread_num; ++i)
    {
        pthread_join(pool->pthreads[i], NULL);    //等待线程池的全部线程运行完成
    }
    
    pthread_mutex_destroy(&(pool->mutex));          //清理资源
    pthread_cond_destroy(&(pool->queue_empty));
    pthread_cond_destroy(&(pool->queue_not_empty));   
    pthread_cond_destroy(&(pool->queue_not_full));    
    free(pool->pthreads);
    struct job *p;
    while (pool->head != NULL)
    {
        p = pool->head;
        pool->head = p->next;
        free(p);
    }
    free(pool);
    return 0;
}

3.进入client

技术分享

client.c

/*************************************************************************
    > File Name: test.c
    > Author: wangzhicheng
    > Mail: [email protected] 
    > Created Time: Fri 03 Oct 2014 09:43:59 PM WST
 ************************************************************************/

#include "socket.h"
const char * serveraddr = "127.0.0.1";
#define TCPPORT 4001
int main() {
	int clientfd = -1;
	char buf[256];
	strcpy(buf, "1");
	if(TCPClientInit(&clientfd) < 0) {
		perror("client init failed...!\n");
		exit(EXIT_FAILURE);
	}
	if(TCPClientConnect(clientfd, serveraddr, TCPPORT)) {
		perror("can not connect to server...!\n");
		exit(EXIT_FAILURE);
	}
	if(TCPWrite(clientfd, buf, strlen(buf) == 1)) {
		printf("send successfully...!\n");
	}
	else printf("send failed...!\n");

	return 0;
}

Makefile

CC=gcc
LIBRARY=../lib
CFLAGS=-I$(LIBRARY)
CXXFLAGS=
OBJS1=client.o  socket.o 

all:	client 


client: $(OBJS1)
	$(CC) -o   [email protected] $(OBJS1) 

socket.o: $(LIBRARY)/socket.c
	$(CC) -c $(LIBRARY)/socket.c

clean:
	rm *.o client  > /dev/null 2>&1

4. 进入server

技术分享

server.c

/*************************************************************************
    > File Name: server.c
    > Author: ma6174
    > Mail: [email protected] 
    > Created Time: Sat 04 Oct 2014 09:46:30 PM WST
 ************************************************************************/

#include "socket.h"
#include "threadpool.h"

#define TCPPORT 4001
#define SIZE 256
#define N 10
int array[N] = {1, 2, 6, 8, 12, 88, 208, 222, 688, 1018};
int find(int low, int high, int m) {
	int mid;
	if(low <= high) {
		mid = (low + high) >> 1;
		if(array[mid] == m) return 1;
		else if(array[mid] > m) return find(low, mid - 1, m);
		else return find(mid + 1, high, m);
	}
	return 0;
}
void* work(void* arg)
{
    int *p = (int *) arg;
	int m = *p;
	if(find(0, N - 1, m)) printf("%d has been found...!\n", m);
	else printf("%d has not been found...!\n", m);
    sleep(1);
}
int main() {
	int serverfd = -1, clientfd = -1;
	char clientaddr[SIZE];
	char buf[SIZE];
	int num;
    struct threadpool *pool = NULL;
	TCPServerInit(TCPPORT, &serverfd);
	if(serverfd < 0) {
		perror("server init failed...!\n");
		exit(EXIT_FAILURE);
	}
    pool = threadpool_init(10, 20);
	while(1) {
		TCPServerWaitConnection(serverfd, &clientfd, clientaddr);
		if(clientfd < 0) {
			perror("can not connect the clients...!\n");
			exit(EXIT_FAILURE);
		}
		if(TCPBlockRead(clientfd, buf, SIZE) <= 0) {
			perror("can not read from client...!\n");
			sleep(1);
		}
		else {
			num = atoi(buf);
			threadpool_add_job(pool, work, &num);
		}
	}
    threadpool_destroy(pool);

	return 0;
}

Makefile

CC=gcc
LIBRARY=../lib
CFLAGS=-I$(LIBRARY)
CXXFLAGS=
OBJS1=server.o  socket.o threadpool.o

all:	server


server: $(OBJS1)
	$(CC) -o   [email protected] $(OBJS1) -lpthread

socket.o: $(LIBRARY)/socket.c
	$(CC) -c $(LIBRARY)/socket.c

threadpool.o: $(LIBRARY)/threadpool.c
	$(CC) -c $(LIBRARY)/threadpool.c
clean:
	rm *.o client  > /dev/null 2>&1
三、測试

技术分享

四、有关线程池的说明

技术分享

当线程池被创建时,线程池中有些“空”的线程。即不运行任务,每当一个任务被增加进来时,任务就被组织成任务队列,线程依照队列队头出。队尾进的原则取出头任务运行。

任务队列中所含任务数必须控制在一个上限内。超过上限时。任务被堵塞。当全部任务被运行完,销毁线程池。
















以上是关于Linux线程池在server上简单应用的主要内容,如果未能解决你的问题,请参考以下文章

线程池在爬虫案例中的应用

使用媒体播放器或声音池在片段内的 onClick 中播放声音

使用实体框架迁移时 SQL Server 连接抛出异常 - 添加代码片段

不同连接池在大量线程需求的情况下都分配了多少线程

Tornado线程池在python2.7的使用

Java线程池在运行后的结果反查