半同步/半异步并发模式进程池实现

Posted gcczhongduan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了半同步/半异步并发模式进程池实现相关的知识,希望对你有一定的参考价值。

半同步/半异步并发模式:父进程监听到新的客户端连接请求后,以通信管道通知进程池中的某一子进程:“嘿,有新的客户连接来了。你去accept。然后处理下!”。从而避免在进程间传递文件描写叙述符。这样的模式中,一个客户连接上的全部任务始终有同一个进程来处理。

详细细节,尽在代码中:

#ifndef PROCESSPOOL_H
#define PROCESSPOOL_H

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>

//描写叙述一个子进程的类
class process
{
public:
    process() : m_pid( -1 ){}

public:
    pid_t m_pid;    //子进程pid
    int m_pipefd[2];//子进程与父进程通信管道(父进程通过管道通知选定的子进程:“有新的连接到来,你去accept")
};

//进程池类:将须要处理的逻辑任务封装为任务类。作为模板參数,以提高代码复用
template< typename T >
class processpool
{
private:
//构造函数,creat函数调用
    processpool( int listenfd, int process_number = 8 );
public:
//给定server监听的socket,和进程数,创建子进程。(注:单例模式)。 为静态函数,因此能够直接以类名调用
    static processpool< T >* create( int listenfd, int process_number = 8 )
    {
        if( !m_instance )//确保仅仅创建唯一的一个进程池实例
        {
            m_instance = new processpool< T >( listenfd, process_number );
        }
        return m_instance;
    }
    ~processpool()
    {
        delete [] m_sub_process;
    }
    void run();  //启动进程池,在当中依据当前进程的标号来区分为父进程或子进程,并分别调用其run_***函数来处理逻辑任务

private:
    void setup_sig_pipe();
    void run_parent();  //父进程的逻辑任务处理函数:监听listen socket,并通知工作进程
    void run_child();   //子进程的任务逻辑处理函数:接受连接socket,完毕客户任务请求

private:
    static const int MAX_PROCESS_NUMBER = 16;  //进程池同意的最大进程数量
    static const int USER_PER_PROCESS = 65536;  //每一个子进程处理的最大的客户任务数量
    static const int MAX_EVENT_NUMBER = 10000;  //  epoll能处理的最大事件数量
    int m_process_number;                       //进程池中的进程数量
    int m_idx;                                  //子进程在池中的编号。从0開始
    int m_epollfd;  //指向poll内核事件表的文件描写叙述符:每一个进程都独立创建一个epoll内核事件表项
    int m_listenfd; //监听socket:父进程与全部子进程的监听socket文件描写叙述符指向内核中的同一文件表项
    int m_stop;     //子进程是否要停止执行
    process* m_sub_process; //保存全部子进程的数组
    static processpool< T >* m_instance;  //进程池静态实例:标识全局唯一的进程池
};
template< typename T >
processpool< T >* processpool< T >::m_instance = NULL;//初始化静态成员

static int sig_pipefd[2];//用于处理信号的管道,以实现统一事件源

//将文件描写叙述符设为非堵塞
static int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

//向epoll内核时间表注冊事件
static void addfd( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );//”仅仅有当事件就绪时,非堵塞才是高校的“
}

//删除fd上的注冊事件
static void removefd( int epollfd, int fd )
{
    epoll_ctl( epollfd, EPOLL_CTL_DEL, fd, 0 );
    close( fd );
}

//消息处理函数,实现事件统一处理:仅仅是往号管道写端写入信号消息。详细信号处理逻辑在while循环中统一处理。

// 减短信号处理函数执行时间,从而确保信号不被屏蔽(信号在处理期间,系统不会再次出发该信号) static void sig_handler( int sig ) { int save_errno = errno; int msg = sig; send( sig_pipefd[1], ( char* )&msg, 1, 0 ); errno = save_errno; } static void addsig( int sig, void( handler )(int), bool restart = true ) { struct sigaction sa; memset( &sa, ‘\0‘, sizeof( sa ) ); sa.sa_handler = handler; if( restart ) { sa.sa_flags |= SA_RESTART; } sigfillset( &sa.sa_mask ); assert( sigaction( sig, &sa, NULL ) != -1 ); } //进程池构造函数 template< typename T > processpool< T >::processpool( int listenfd, int process_number ) : m_listenfd( listenfd ), m_process_number( process_number ), m_idx( -1 ), m_stop( false ) { assert( ( process_number > 0 ) && ( process_number <= MAX_PROCESS_NUMBER ) ); m_sub_process = new process[ process_number ]; assert( m_sub_process ); for( int i = 0; i < process_number; ++i ) { int ret = socketpair( PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd ); assert( ret == 0 ); m_sub_process[i].m_pid = fork(); assert( m_sub_process[i].m_pid >= 0 ); if( m_sub_process[i].m_pid > 0 )//为父进程 { close( m_sub_process[i].m_pipefd[1] ); continue;//父进程继续创建好全部的子进程后,才退出该函数 } else { close( m_sub_process[i].m_pipefd[0] );//子进程 m_idx = i;//初始化子进程在进程池中的编号(最小为0,而父进程标号为-1) break;//子进程一旦创建好。就退出该函数。进入逻辑任务处理(pool->run) } } } //信号管道设置函数 template< typename T > void processpool< T >::setup_sig_pipe() { m_epollfd = epoll_create( 5 ); assert( m_epollfd != -1 ); int ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd ); assert( ret != -1 ); setnonblocking( sig_pipefd[1] );//信号处理函数向写端写入消息 addfd( m_epollfd, sig_pipefd[0] );//在while中监听读端 addsig( SIGCHLD, sig_handler );//子进程退出会暂停 addsig( SIGTERM, sig_handler );//终止进程,kill函数默认即发送该信号 addsig( SIGINT, sig_handler );//键盘输入以终止该进程ctrl+C addsig( SIGPIPE, SIG_IGN );//忽略往管道读端被关闭的管道写数据的信号 } //进程池启动函数 template< typename T > void processpool< T >::run() { if( m_idx != -1 ) { run_child();//子进程 return;//子进程退出后,记得return } run_parent();//父进程 } //子进程逻辑函数 template< typename T > void processpool< T >::run_child() { setup_sig_pipe(); int pipefd = m_sub_process[m_idx].m_pipefd[ 1 ];//与父进程的通信管道 addfd( m_epollfd, pipefd ); epoll_event events[ MAX_EVENT_NUMBER ]; T* users = new T [ USER_PER_PROCESS ]; //一次性创建全部的客户端任务对象数组(也用到池的思想,当某个客户任务处理完后,不用释放该对象资源,而是放回池中再利用) assert( users ); int number = 0; int ret = -1; //监听信号管道、通信管道、连接socket,处理逻辑任务 while( ! m_stop ) { number = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) ) //为父进程的通信事件 { int client = 0; ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 ); if( ( ( ret < 0 ) && ( errno != EAGAIN ) ) || ret == 0 ) { continue; } else { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( m_listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); if ( connfd < 0 ) { printf( "errno is: %d\n", errno ); continue; } addfd( m_epollfd, connfd ); users[connfd].init( m_epollfd, connfd, client_address ); } } else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )//为信号管道事件 { int sig; char signals[1024]; ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 ); if( ret <= 0 ) { continue; } else { for( int i = 0; i < ret; ++i ) { switch( signals[i] ) { case SIGCHLD: { pid_t pid; int stat; while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ) { continue; } break; } case SIGTERM: case SIGINT: { m_stop = true; break; } default: { break; } } } } } else if( events[i].events & EPOLLIN ) //为连接socket事件 { users[sockfd].process();//客户任务对象的逻辑处理函数 } else { continue; } } } delete [] users; users = NULL; close( pipefd );//关闭与父进程的通信管道的读端 //close( m_listenfd );//”对象由那个函数创建,就由那个函数销毁“ close( m_epollfd ); } //父进程逻辑函数 template< typename T > void processpool< T >::run_parent() { setup_sig_pipe(); addfd( m_epollfd, m_listenfd ); epoll_event events[ MAX_EVENT_NUMBER ]; int sub_process_counter = 0; int new_conn = 1; int number = 0; int ret = -1; //监听listen socket、和信号管道 while( ! m_stop ) { number = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if( sockfd == m_listenfd ) //监听到新连接到来,论选出一个子进程,通知该子进程”嘿。有新的连接到了。你接受下!“ { int i = sub_process_counter; do { if( m_sub_process[i].m_pid != -1 ) { break; } i = (i+1)%m_process_number; } while( i != sub_process_counter ); if( m_sub_process[i].m_pid == -1 )//全部子进程都都已经推出 { m_stop = true; break; } sub_process_counter = (i+1)%m_process_number; //send( m_sub_process[sub_process_counter++].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 ); send( m_sub_process[i].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 );//通知子进程 printf( "send request to child %d\n", i ); //sub_process_counter %= m_process_number; } else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) ) { int sig; char signals[1024]; ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 ); if( ret <= 0 ) { continue; } else { for( int i = 0; i < ret; ++i ) { switch( signals[i] ) { case SIGCHLD://子进程退出信号 { pid_t pid; int stat; while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ) { for( int i = 0; i < m_process_number; ++i ) { if( m_sub_process[i].m_pid == pid ) { printf( "child %d join\n", i ); close( m_sub_process[i].m_pipefd[0] ); m_sub_process[i].m_pid = -1; //设置已经推出的子进程的PID为-1 } } } m_stop = true; for( int i = 0; i < m_process_number; ++i ) { if( m_sub_process[i].m_pid != -1 ) //仅仅要另一个子进程没有退出,则父进程继续 { m_stop = false; } } break; } case SIGTERM: case SIGINT: { printf( "kill all the clild now\n" ); for( int i = 0; i < m_process_number; ++i ) { int pid = m_sub_process[i].m_pid; if( pid != -1 ) { kill( pid, SIGTERM ); } } break; } default: { break; } } } } } else { continue; } } } //close( m_listenfd ); close( m_epollfd ); } #endif

以上是关于半同步/半异步并发模式进程池实现的主要内容,如果未能解决你的问题,请参考以下文章

半同步半异步高性能网络编程

C++11实现半同步半异步的线程池

两种高效的并发模式(半同步/半异步和领导者/追随者)

半同步半异步I/O的设计模式(half sync/half async)

线程同步之条件变量使用手记

Mysql半同步复制模式说明 - 运维小结