ZMQ源码分析-- 基础数据结构的实现

Posted 子曰帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZMQ源码分析-- 基础数据结构的实现相关的知识,希望对你有一定的参考价值。

yqueue 和 ypipe

zmq号称是”史上最快的消息队列”,由此可见zmq中最重要的数据结构就是队列。zmq的队列主要由yqueue和ypipe实现。yqueue是队列的基本操作,下面首先分析yqueue的实现。

 //  Individual memory chunk to hold N elements.
        //  Individual memory chunk to hold N elements.
        struct chunk_t
        
             T values [N];
             chunk_t *prev;
             chunk_t *next;
        ;

        //  Back position may point to invalid memory if the queue is empty,
        //  while begin & end positions are always valid. Begin position is
        //  accessed exclusively be queue reader (front/pop), while back and
        //  end positions are accessed exclusively by queue writer (back/push).
        chunk_t *begin_chunk;
        int begin_pos;
        chunk_t *back_chunk;
        int back_pos;
        chunk_t *end_chunk;
        int end_pos;

        //  People are likely to produce and consume at similar rates.  In
        //  this scenario holding onto the most recently freed chunk saves
        //  us from having to call malloc/free.
        atomic_ptr_t<chunk_t> spare_chunk;

在yqueue中有一个重要的结构体chunk_t,他是yqueue高效的关键因素。内存的申请和释放非常浪费效率,yqueue为了避免频繁的内存操作,每次不会申请一个元素大小的内存空间,而是申请一批,这一批元素就保存在chunk_t结构体中,yqueu用三个指针和三个游标来记录chunk以及在chunk内有效的数据的索引。以下面的push操作为例,当在队列的末尾添加一个元素时,会先判断当前尾端的chunk是否还有空闲的元素,即end_pos是否等于N-1,相等则说明需要申请新的chunk_t,否则直接移动end_pos即可。另外由于很多队列中生产和消费的速率比较一致,所以yqueue用一个spare_chunk来保存刚刚释放的chunk,这样当需要申请新的chunk时就可以直接使用spare_chunk所记录的chunk了。除了push外,yqueue还提供了pop和unpush操作,实现原理和push类似。

        //  Adds an element to the back end of the queue.
        inline void push ()
        
            back_chunk = end_chunk;
            back_pos = end_pos;

            if (++end_pos != N)
                return;

            chunk_t *sc = spare_chunk.xchg (NULL);
            if (sc) 
                end_chunk->next = sc;
                sc->prev = end_chunk;
             else 
                end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
                alloc_assert (end_chunk->next);
                end_chunk->next->prev = end_chunk;
            
            end_chunk = end_chunk->next;
            end_pos = 0;
        

接下来看ypipe。ypipe继承自ypipe_base_t,ypipe_base_t抽象出了ypipe和ypipe_conflate(后面分析)的基本操作:

    template <typename T> class ypipe_base_t
    
    public:
        virtual ~ypipe_base_t () 
        virtual void write (const T &value_, bool incomplete_) = 0;
        virtual bool unwrite (T *value_) = 0;
        virtual bool flush () = 0;
        virtual bool check_read () = 0;
        virtual bool read (T *value_) = 0;
        virtual bool probe (bool (*fn)(const T &)) = 0;
    ;

ypipe包含了了一个yqueue队列和四个非常重要的指针,下面是ypipe的成员变量定义:

       //  Allocation-efficient queue to store pipe items.
        //  Front of the queue points to the first prefetched item, back of
        //  the pipe points to last un-flushed item. Front is used only by
        //  reader thread, while back is used only by writer thread.
        yqueue_t <T, N> queue;

        //  Points to the first un-flushed item. This variable is used
        //  exclusively by writer thread.
        T *w;

        //  Points to the first un-prefetched item. This variable is used
        //  exclusively by reader thread.
        T *r;

        //  Points to the first item to be flushed in the future.
        T *f;

        //  The single point of contention between writer and reader thread.
        //  Points past the last flushed item. If it is NULL,
        //  reader is asleep. This pointer should be always accessed using
        //  atomic operations.
        atomic_ptr_t <T> c;

这四个指针非常重要,下面来看一下他们各自的作用:

        //  Initialises the pipe.
        inline ypipe_t ()
        
            //  Insert terminator element into the queue.
            queue.push ();

            //  Let all the pointers to point to the terminator.
            //  (unless pipe is dead, in which case c is set to NULL).
            r = w = f = &queue.back ();
            c.set (&queue.back ());
        

初始化时先想队列放入一个空对象作为结束符,所有指针都指向这个结束符。

       //  Write an item to the pipe.  Don't flush it yet. If incomplete is
        //  set to true the item is assumed to be continued by items
        //  subsequently written to the pipe. Incomplete items are never
        //  flushed down the stream.
        inline void write (const T &value_, bool incomplete_)
        
            //  Place the value to the queue, add new terminator element.
            queue.back () = value_;
            queue.push ();

            //  Move the "flush up to here" poiter.
            if (!incomplete_)
                f = &queue.back ();
        

        //  Pop an incomplete item from the pipe. Returns true is such
        //  item exists, false otherwise.
        inline bool unwrite (T *value_)
        
            if (f == &queue.back ())
                return false;
            queue.unpush ();
            *value_ = queue.back ();
            return true;
        

f指针指向了当前未做flush操作的第一个元素,如果是写入了一条完整消息,那f指向的就是结束符。

        //  Flush all the completed items into the pipe. Returns false if
        //  the reader thread is sleeping. In that case, caller is obliged to
        //  wake the reader up before using the pipe again.
        inline bool flush ()
        
            //  If there are no un-flushed items, do nothing.
            if (w == f)
                return true;

            //  Try to set 'c' to 'f'.
            if (c.cas (w, f) != w) 

                //  Compare-and-swap was unseccessful because 'c' is NULL.
                //  This means that the reader is asleep. Therefore we don't
                //  care about thread-safeness and update c in non-atomic
                //  manner. We'll return false to let the caller know
                //  that reader is sleeping.
                c.set (f);
                w = f;
                return false;
            

            //  Reader is alive. Nothing special to do now. Just move
            //  the 'first un-flushed item' pointer to 'f'.
            w = f;
            return true;
        

flush操作比较重要,除了要把w指向f外,还要判断当前pipe的read是否是sleep状态,判断的方式是用c和w作比较,c只能有两个值,要么等于w,要么为空,当c为空时说明之前的check_read操作没有读到元素,check_read返回false同时将c置为空。check_read的返回值决定了上层的操作策略。flush的返回值也表明了之前check_read操作是否返回了false。

        //  Check whether item is available for reading.
        inline bool check_read ()
        
            //  Was the value prefetched already? If so, return.
            if (&queue.front () != r && r)
                 return true;

            //  There's no prefetched value, so let us prefetch more values.
            //  Prefetching is to simply retrieve the
            //  pointer from c in atomic fashion. If there are no
            //  items to prefetch, set c to NULL (using compare-and-swap).
            r = c.cas (&queue.front (), NULL);

            //  If there are no elements prefetched, exit.
            //  During pipe's lifetime r should never be NULL, however,
            //  it can happen during pipe shutdown when items
            //  are being deallocated.
            if (&queue.front () == r || !r)
                return false;

            //  There was at least one value prefetched.
            return true;
        

        //  Reads an item from the pipe. Returns false if there is no value.
        //  available.
        inline bool read (T *value_)
        
            //  Try to prefetch a value.
            if (!check_read ())
                return false;

            //  There was at least one value prefetched.
            //  Return it to the caller.
            *value_ = queue.front ();
            queue.pop ();
            return true;
        

之前提到过check_read操作,它的返回值标记了队列中是否有数据,他使用r指针来标记当前可以读到的位置,如果r指针不在front位置处,说明有元素可读。否则就用c和front对比来判断当前是否有元素,如果没有将c置为空,表明读操作处于睡眠状态。

yqueue中指针的使用相对复杂,他们除了指向具体位置外还标记了一些状态,使用非常巧妙。

dbuffer_t 和 ypipe_conflate_t

ypipe_conflate_t是ypipe_base_t的另一种实现,和ypipe相比它的效率更高,但是数据是不安全的。它的底层使用dbuffer_t实现的。ypipe_conflate_t是zmq4.x版本中新加入的一个数据结构,使用一些对数据完整性要求不高的需求,实现相对简单,这里不做具体分析。

pipe

pipe是zmq中保存消息的一个双向管道,他维护两个ypipe_base_t队列,一个inpipe,一个outpipe。他主要用于socket_base之间(进程内通讯)或者socket_base和session_base之间传递消息。下面是pipe中比较重要的成员变量:

        //  Underlying pipes for both directions.
        upipe_t *inpipe;
        upipe_t *outpipe;

        //  Can the pipe be read from / written to?
        bool in_active;
        bool out_active;

        //  High watermark for the outbound pipe.
        int hwm;

        //  Low watermark for the inbound pipe.
        int lwm;

        //  Number of messages read and written so far.
        uint64_t msgs_read;
        uint64_t msgs_written;

        //  Last received peer's msgs_read. The actual number in the peer
        //  can be higher at the moment.
        uint64_t peers_msgs_read;

        //  The pipe object on the other side of the pipepair.
        pipe_t *peer;

        //  Sink to send events to.
        i_pipe_events *sink;

        //  States of the pipe endpoint:
        //  active: common state before any termination begins,
        //  delimiter_received: delimiter was read from pipe before
        //      term command was received,
        //  waiting_fo_delimiter: term command was already received
        //      from the peer but there are still pending messages to read,
        //  term_ack_sent: all pending messages were already read and
        //      all we are waiting for is ack from the peer,
        //  term_req_sent1: 'terminate' was explicitly called by the user,
        //  term_req_sent2: user called 'terminate' and then we've got
        //      term command from the peer as well.
        enum 
            active,
            delimiter_received,
            waiting_for_delimiter,
            term_ack_sent,
            term_req_sent1,
            term_req_sent2
         state;

        //  If true, we receive all the pending inbound messages before
        //  terminating. If false, we terminate immediately when the peer
        //  asks us to.
        bool delay;

        //  Identity of the writer. Used uniquely by the reader side.
        blob_t identity;

        //  Pipe's credential.
        blob_t credential;
        const bool conflate;

in_active和out_active标记管道中的队列是否是活跃状态,如果队列已满或者队列为空,这两个标记则设为false,上层根据管道的状态决定是否要进行休眠或者其他操作,比如session_base如果检测到false则会把engine中对应的fd设置为reset状态。hwm和lwm是两个阈值,hwm表示当前队列已满,lwm表示当msgs_read每达到lwm时要象对面的pipe发送一条激活消息,表明已经处理了一些数据,对面的可以继续向管道内写入数据。消息的发送机制会在接下来的章节中分析。i_pipe_events 是一个抽象类:

    struct i_pipe_events
    
        virtual ~i_pipe_events () 
        virtual void read_activated (zmq::pipe_t *pipe_) = 0;
        virtual void write_activated (zmq::pipe_t *pipe_) = 0;
        virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
        virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
    ;

sink是指向上层实现i_pipe_events的类的指针(session_base或者socket_base),当队列变为激活状态时,pipe需要通过sink通知上层可以从pipe中读取数据或者写入数据了。

以上是关于ZMQ源码分析-- 基础数据结构的实现的主要内容,如果未能解决你的问题,请参考以下文章

ZMQ源码分析--MSG

ZMQ源码分析 --TCP通讯

ZMQ源码分析 --进程内通讯

ZMQ源码分析--编码器和解码器

ZMQ源码分析--对象管理和消息机制

ZMQ源码分析--其他socket_base模型