diy数据库--线程控制块消息线程池

Posted 关于会飞的猪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了diy数据库--线程控制块消息线程池相关的知识,希望对你有一定的参考价值。

一、概述

        1、diy数据库使用的是一个多进程and多线程的服务器模型。每个进程作为一个节点实例,监听一个端口;而每个用户连接在数据库节点实例中都会有一个代理线程与之对应。


                                                                 

        2、除了主线程外每个线程都有一个EDU(进程调度单元,也可称为线程控制块),另外每种系统线程类型有且只有一个线程实体(这里的系统线程只有一种,即监听线程,主线程不在线程管理池里面)

        3、代理线程是专门处理用户请求的 ,由监听线程创建


        4、线程池对进行调度,通过传入不同的类型, 内部调用不同的函数执行相应请求

        5、EDU回池后可以被分派给其他任务

执行队列:所有取得任务的线程的EDU(即线程控制块)都在执行队列中

等待队列:客户端退出后的代理线程在线程池不满的情况下会放在空闲线程队列中


二、线程类(即线程控制块(EDU))

#ifndef PMDEDU_HPP__
#define PMDEDU_HPP__

#include "core.hpp"
#include "pmdEDUEvent.hpp"
#include "ossQueue.hpp"
#include "ossSocket.hpp"

#define PMD_INVALID_EDUID 0//非法edu_id
#define PMD_IS_EDU_CREATING(x)    ( PMD_EDU_CREATING == x )//线程正在创建
#define PMD_IS_EDU_RUNNING(x)     ( PMD_EDU_RUNNING  == x )//运行状态
#define PMD_IS_EDU_WAITING(x)     ( PMD_EDU_WAITING  == x )//等待状态(对于代理线程,表示其正在等待所代理的客户的响应)
#define PMD_IS_EDU_IDLE(x)        ( PMD_EDU_IDLE     == x )//空闲状态(已经回池)
#define PMD_IS_EDU_DESTROY(x)     ( PMD_EDU_DESTROY  == x )//销毁状态

typedef unsigned long long EDUID ;

enum EDU_TYPES//线程的类型
{
   // System EDU Type监听线程
   EDU_TYPE_TCPLISTENER = 0,
   // Agent EDU Type处理线程
   EDU_TYPE_AGENT,

   EDU_TYPE_UNKNOWN,//用于以后扩展
   EDU_TYPE_MAXIMUM = EDU_TYPE_UNKNOWN
} ;

enum EDU_STATUS//线程的状态
{
   PMD_EDU_CREATING = 0,
   PMD_EDU_RUNNING,
   PMD_EDU_WAITING,
   PMD_EDU_IDLE,
   PMD_EDU_DESTROY,
   PMD_EDU_UNKNOWN,
   PMD_EDU_STATUS_MAXIMUM = PMD_EDU_UNKNOWN
} ;

class pmdEDUMgr ;//线程池声明

class pmdEDUCB//线程控制块
{
public :
   pmdEDUCB ( pmdEDUMgr *mgr, EDU_TYPES type ) ;//从指定线程池得到指定类型的线程
   inline EDUID getID()//线程id
   {
      return _id ;
   }
   inline void postEvent ( pmdEDUEvent const &data )//发送事件
   {
      _queue.push ( data ) ;//实际上是将消息压入消息队列,每一个edu都有一个消息队列
   }
   bool waitEvent ( pmdEDUEvent &data, long long millsec )//millsec<0表示无限等待
   {
      // millsec小于零则无线等待
      bool waitMsg = false ;
      if ( PMD_EDU_IDLE != _status )
      {
         _status = PMD_EDU_WAITING ;
      }
      if ( 0 > millsec )
      {
         _queue.wait_and_pop ( data ) ;//无限等待
         waitMsg = true ;
      }
      else
      {
         waitMsg = _queue.timed_wait_and_pop ( data, millsec ) ;//超时等待
      }

      if ( waitMsg )
      {
         if ( data._eventType == PMD_EDU_EVENT_TERM )//结束事件:想结束edu
         {
            _isDisconnected = true ;
         }
         else
         {
            _status = PMD_EDU_RUNNING ;//回去继续执行
         }
      }
      return waitMsg ;
   }
   inline void force ()//强行停止当前线程
   {
      _isForced = true ;
   }
   inline void disconnect ()//断开连接
   {
      _isDisconnected = true ;
   }
   inline EDU_TYPES getType ()//得到当前的线程类型
   {
      return _type ;
   }
   inline EDU_STATUS getStatus ()//得到当前的线程状态
   {
      return _status ;
   }
   inline void setType ( EDU_TYPES type )//设置线程类型
   {
      _type = type ;
   }
   inline void setID ( EDUID id )//设置当前的id
   {
      _id = id ;
   }
   inline void setStatus ( EDU_STATUS status )//设置当前线程的状态
   {
      _status = status ;
   }
   inline bool isForced ()
   {
      return _isForced ;
   }
   inline pmdEDUMgr *getEDUMgr ()//得到当前线程对应的线程池
   {
      return _mgr ;
   }
private :
   EDU_TYPES  _type ;//线程类型
   pmdEDUMgr *_mgr ;//归属于哪个线程池
   EDU_STATUS _status ;//线程状态
   EDUID      _id ;//线程的eduid
   bool       _isForced ;//是否关闭线程
   bool       _isDisconnected ;//是否断开连接
   ossQueue<pmdEDUEvent> _queue ;//消息队列,每个edu都有一个消息队列
} ;

typedef int (*pmdEntryPoint) ( pmdEDUCB *, void * ) ;//线程入口函数的类型,函数指针
pmdEntryPoint getEntryFuncByType ( EDU_TYPES type ) ;//通过线程类型得到对应的入口函数

int pmdAgentEntryPoint ( pmdEDUCB *cb, void *arg ) ;//代理类型的函数指针
int pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *arg ) ;//监听类型的函数指针
int pmdEDUEntryPoint ( EDU_TYPES type, pmdEDUCB *cb, void *arg ) ;//线程的入口函数

int pmdRecv ( char *pBuffer, int recvSize,
              ossSocket *sock, pmdEDUCB *cb ) ;//tcp接收
int pmdSend ( const char *pBuffer, int sendSize,
              ossSocket *sock, pmdEDUCB *cb ) ;//tcp发送

#endif
       1)diy数据库的线程类结构比较清晰,而且功能比较完整。我们首先从他的属性开始分析。

       EDU_TYPES  _type;//线程类型

       diydb中主要有两种线程类型,一种是监听线程,一种是代理线程。监听线程负责监听端口,并与客户端建立连接,然后从线程池取得一个代理线程去已连接的客户端的请求。一种是代理线程,所有与客户端的交互都是在代理线程中执行的。

pmdEDUMgr *_mgr;//线程所属的线程池

        每一个数据库实例都有一个线程池,这个线程池是可以动态增长的。这个线程池管理者这个数据库实例中的所有线程。

EDUSTADUS _stadus;//线程状态

每一个线程都处于一个有限状态机中,线程一共有五种状态,当线程收到消息时,会在五种状态之间转换。

EDUID  id;//线程的edu id

        bool       _isForced ;//是否关闭线程,一般由线程池来设置为true
        bool       _isDisconnected ;//是否断开连接,在代理线程接收到客户端的disconnect命令后就会将本属性设置为true
        ossQueue<pmdEDUEvent> _queue ;//消息队列,每个edu都有一个消息队列

2)线程的入口函数
typedef int (*pmdEntryPoint) ( pmdEDUCB *, void * ) ;//线程入口函数的类型,函数指针

pmdEntryPoint getEntryFuncByType ( EDU_TYPES type ) ;//通过线程类型得到对应的入口函数

int pmdAgentEntryPoint ( pmdEDUCB *cb, void *arg ) ;//代理类型的函数指针
int pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *arg ) ;//监听类型的函数指针
int pmdEDUEntryPoint ( EDU_TYPES type, pmdEDUCB *cb, void *arg ) ;//线程的入口函数
,实际上在函数体里面会根据type参数来调用对应的线程入口函数


三、线程间通信用的消息类

#ifndef PMDEDUEVENT_HPP__
#define PMDEDUEVENT_HPP__

#include "core.hpp"
enum pmdEDUEventTypes//消息类型
{
   PMD_EDU_EVENT_NONE = 0,//空的消息
   PMD_EDU_EVENT_TERM,     // 结束消息
   PMD_EDU_EVENT_MSG,
   PMD_EDU_EVENT_RESUME   // 唤醒消息
} ;

class pmdEDUEvent
{
public :
   pmdEDUEvent () :
   _eventType(PMD_EDU_EVENT_NONE),
   _release(false),
   _Data(NULL)
   {
   }

   pmdEDUEvent ( pmdEDUEventTypes type ) :
   _eventType(type),
   _release(false),
   _Data(NULL)
   {
   }

   pmdEDUEvent ( pmdEDUEventTypes type, bool release, void *data ) :
   _eventType(type),
   _release(release),
   _Data(data)
   {
   }

   void reset ()//清空
   {
      _eventType = PMD_EDU_EVENT_NONE ;
      _release = false ;
      _Data = NULL ;
   }

   pmdEDUEventTypes _eventType ;//事件类型
   bool             _release ;//释放
   void            *_Data ;//数据
} ;

#endif
我们可以看到一共有四类消息,_release表示收到该消息的线程是否马上释放这个消息中数据指针所指的数据,_Data是指向数据地址的指针,表示消息中所带的数据,这些消息主要是供线程间通信用的。


四、线程池

#ifndef PMDEDUMGR_HPP__
#define PMDEDUMGR_HPP__

#include "core.hpp"
#include "pmdEDU.hpp"
#include "ossLatch.hpp"
#include "ossUtil.hpp"

#define EDU_SYSTEM     0x01//系统级edu
#define EDU_USER       0x02//用户级edu
#define EDU_ALL        ( EDU_SYSTEM | EDU_USER )


class pmdEDUMgr//线程池
{
private :
   std::map<EDUID, pmdEDUCB*> _runQueue ;//运行的线程队列
   std::map<EDUID, pmdEDUCB*> _idleQueue ;//空闲的线程队列
   std::map<unsigned int, EDUID> _tid_eduid_map ;//线程池中所有线程id和其edu的映射

   ossSLatch _mutex ;//对线程队列的的访问时必须用到的读写锁
   EDUID _EDUID ;//new下一个edu时分配的id
   std::map<unsigned int, EDUID> _mapSystemEDUS ;//系统edu的类型和eduid之间的映射
   bool _isQuiesced ;//标识数据库是否被挂起,当数据库正在挂起时线程池不接受请求,挂起数据库是为了使DBA可以对数据库进行一些特殊的操作
   bool _isDestroyed ;//线程池是否被销毁
public :
   pmdEDUMgr () :
   _EDUID(1),
   _isQuiesced(false),
   _isDestroyed(false)
   {
   }

   ~pmdEDUMgr ()
   {
      reset () ;
   }
   void reset ()
   {
      _destroyAll () ;//删除线程池中的所有线程
   }

   unsigned int size ()//得到所有线程的数量
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;//共享锁,因为是读
      num = ( unsigned int ) _runQueue.size() +
            ( unsigned int ) _idleQueue.size () ;
      _mutex.release_shared () ;
      return num ;
   }

   unsigned int sizeRun ()//得到运行队列的长度
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;
      num = ( unsigned int ) _runQueue.size () ;
      _mutex.release_shared () ;
      return num ;
   }

   unsigned int sizeIdle ()
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;
      num = ( unsigned int ) _idleQueue.size () ;
      _mutex.release_shared () ;
      return num ;
   }

   unsigned int sizeSystem ()//系统edu的数量
   {
      unsigned int num = 0 ;
      _mutex.get_shared () ;
      num = _mapSystemEDUS.size() ;
      _mutex.release_shared () ;
      return num ;
   }

   EDUID getSystemEDU ( EDU_TYPES edu )//由系统edu类型得到其对应的eduid
   {
      EDUID eduID = PMD_INVALID_EDUID;
      _mutex.get_shared () ;
      std::map<unsigned int, EDUID>::iterator it = _mapSystemEDUS.find( edu ) ;
      if ( it != _mapSystemEDUS.end() )
      {
         eduID = it->second  ;
      }
      _mutex.release_shared () ;
      return eduID ;
   }

   bool isSystemEDU ( EDUID eduID )
   {
      bool isSys = false ;
      _mutex.get_shared () ;
      isSys = _isSystemEDU ( eduID ) ;
      _mutex.release_shared () ;
      return isSys ;
   }

   void regSystemEDU ( EDU_TYPES edu, EDUID eduid )//注册一个系统edu
   {
      _mutex.get() ;
      _mapSystemEDUS[ edu ] = eduid ;
      _mutex.release () ;
   }

   bool isQuiesced ()
   {
      return _isQuiesced ;
   }
   void setQuiesced ( bool b )
   {
      _isQuiesced = b ;
   }
   bool isDestroyed ()
   {
      return _isDestroyed ;
   }
   static bool isPoolable ( EDU_TYPES type )//线程edu的类型对应的线程是否能回收到线程池,目前只有agent线程的edu可以被回收,因为正常情况下监听线程是不会退出的
   {
      return ( EDU_TYPE_AGENT == type ) ;
   }

private :
   int _createNewEDU ( EDU_TYPES type, void *arg, EDUID *eduid ) ;//没有空闲edu时,创建新的edu
   int _destroyAll () ;//销毁所有的线程
   int _forceEDUs ( int property = EDU_ALL ) ;//干掉某种类型的edu
   unsigned int _getEDUCount ( int property = EDU_ALL ) ;
   void _setDestroyed ( bool b )
   {
      _isDestroyed = b ;
   }
   bool _isSystemEDU ( EDUID eduID )
   {
      std::map<unsigned int, EDUID>::iterator it = _mapSystemEDUS.begin() ;
      while ( it != _mapSystemEDUS.end() )
      {
         if ( eduID == it->second )
         {
            return true ;
         }
         ++it ;
      }
      return false ;
   }

   int _destroyEDU ( EDUID eduID ) ;//销毁一个特定的edu

   int _deactivateEDU ( EDUID eduID ) ;//把一个edu放回线程池
public :
   /*
    * EDU Status Transition Table
    * C: CREATING
    * R: RUNNING
    * W: WAITING
    * I: IDLE
    * D: DESTROY
    * 函数
    * c: createNewEDU
    * a: activateEDU
    * d: destroyEDU
    * w: waitEDU
    * t: deactivateEDU
    *   C   R   W   I   D  <--- from
    * C c
    * R -   -   a   a   -  <--- Creating/Idle/Waiting status can move to Running status
    * W w   w   -   -   -  <--- Running status move to Waiting
    * I -   -   t   -   -  <--- Creating/Waiting status move to Idle
    * D d   -   d   d   -  <--- Creating / Waiting / Idle can be destroyed
    * ^ To
    */

   int activateEDU ( EDUID eduID ) ;//将waiting和idle线程激活成running线程
   int waitEDU ( EDUID eduID ) ;//使edu从执行状态转换成等待状态
   int startEDU ( EDU_TYPES type, void* arg, EDUID *eduid ) ;//可以创建一个edu,并创建相应的线程。如果创建的是代理,而且线程池中有空闲线程则从线程池里面去拿,并把相应的EDU从空闲队列放到运行队列
   int postEDUPost ( EDUID eduID, pmdEDUEventTypes type,
                     bool release = false, void *pData = NULL ) ;//向一个指定线程的队列发送一个消息,将激活因为等待消息队列中的消息而阻塞的线程 
   int waitEDUPost ( EDUID eduID, pmdEDUEvent& event,
                     long long millsecond ) ;//等待指定edu的事件  
   int returnEDU ( EDUID eduID, bool force, bool* destroyed ) ;//把edu销毁或者回池
   int forceUserEDU ( EDUID eduID ) ;
   pmdEDUCB *getEDU ( unsigned int tid ) ;//通过线程id得到edu
   pmdEDUCB *getEDU () ;//得到当前的edu
   pmdEDUCB *getEDUByID ( EDUID eduID ) ;
   void setEDU ( unsigned int tid, EDUID eduid ) ;

} ;

#endif

分析 

1、EDU 的状态迁移图
    * C: CREATING
    * R: RUNNING
    * W: WAITING
    * I: IDLE
    * D: DESTROY
    * 函数
    * c: createNewEDU
    * a: activateEDU
    * d: destroyEDU
    * w: waitEDU
    * t: deactivateEDU
    *   C   R   W   I   D  <--- from
    * C c
    * R -   -   a   a   -  <---Idle/Waiting status can move to Running status
    * W w   w   -   -   -  <--- Creating/Running status move to Waiting
    * I t   -   t   -   -  <--- Waiting status move to Idle
    * D d   -   d   d   -  <--- Creating / Waiting / Idle can be destroyed

2、CREATING、 RUNNING、 WAITING这三种状态的线程的EDU存放在运行线程队列中,IDLE状态的线程放在空闲线程队列中,DESTROY线程的EDU不在任何线程队列中(因为当一个线程的状态为销毁状态时,他的线程管理块已经被线程池从两个线程队列中删除了),当线程实体检测到自己的状态为DESTROY时,该线程实体就会从线程执行函数返回,从而真正意义上结束线程


五、总结:

       1、本文主要分析了Diydb的线程模块,实际上就是一个比较完善的线程池模型

       2、线程池主要由以下几个类型实现:

             线程控制块:主要是对线程实体的封装。每个线程控制块实体与一个线程实体对应,其包含了这个线程的状态、类型、对应的线程池以及该线程的消息队列等。

            消             息:消息实体用来在线程间传递信息,通过这些消息可以传递普通的数据,也可以控制接收到消息的线程的状态

            线程管理 类:整个Diydb中只有一个线程管理类的实例,她是线程池的核心,主要管理着两个队列,一个是运行线程队列,一个是空闲线程队列(空闲线程队列中只可能是代理线程)。她控制着所有线程的状态,从而来维护线程池。

      3、本线程池中的线程分为系统线程(这里只有监听线程)和用户线程(这里只有代理线程),每个系统线程类型对应的只有一个线程实体,而每个用户线程类型的线程对应的是多个线程实体。

      4、只有代理线程的状态可能是空闲线程,所以从某种意义上来讲这个线程池实际上只对代理线程的效率有益,尽管系统线程的EDU也在运行线程队列中,而且其状态也受线程管理类的控制。

      5、线程池中的空闲线程不是系统启动时一次性创建完毕的,是当创建的代理线程处理完用户请求后(即用户断开连接),返回给空闲线程队列的(当然前提条件是当前空闲线程的个数小于maxpool,即空闲线程上限)。

      6、线程池的大小是由一个参数(maxpool)来控制的,当线程池中空闲线程的个数等于maxpool时,运行完毕的线程就不允许回池。

      7、当线程池中的空闲线程都用完以后,如果有新的用户请求过来,就会创建一个新的代理线程去处理这个用户的任务。所以,这个线程池中的线程数是可以动态变化的。


以上是关于diy数据库--线程控制块消息线程池的主要内容,如果未能解决你的问题,请参考以下文章

Linux入门多线程(线程概念生产者消费者模型消息队列线程池)万字解说

RTX 线程通信之——内存池

RTX 线程通信之——内存池

Java——线程池

线程池与并行度

Java线程池详解