ZMQ源码分析--MSG

Posted 子曰帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZMQ源码分析--MSG相关的知识,希望对你有一定的参考价值。

MSG

msg类是zmq中消息的定义类,tcp数据传送,进程间数据传送,进程内数据传送都是以msg为基本单位的。由于msg经常需要在对象之间传送,所以如何能尽量避免拷贝是msg需要解决的问题。

 class msg_t

public:

    //  Message flags.
    enum
    
        more = 1,           //  Followed by more parts
        command = 2,        //  Command frame (see ZMTP spec)
        credential = 32,
        identity = 64,
        shared = 128
    ;

    bool check ();
    int init ();
    int init_size (size_t size_);
    int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
        void *hint_);
    int init_delimiter ();
    int close ();
    int move (msg_t &src_);
    int copy (msg_t &src_);
    void *data ();
    size_t size ();
    unsigned char flags ();
    void set_flags (unsigned char flags_);
    void reset_flags (unsigned char flags_);
    int64_t fd ();
    void set_fd (int64_t fd_);
    metadata_t *metadata () const;
    void set_metadata (metadata_t *metadata_);
    void reset_metadata ();
    bool is_identity () const;
    bool is_credential () const;
    bool is_delimiter () const;
    bool is_vsm ();
    bool is_cmsg ();

    //  After calling this function you can copy the message in POD-style
    //  refs_ times. No need to call copy.
    void add_refs (int refs_);

    //  Removes references previously added by add_refs. If the number of
    //  references drops to 0, the message is closed and false is returned.
    bool rm_refs (int refs_);

private:

    //  Size in bytes of the largest message that is still copied around
    //  rather than being reference-counted.
    enum  msg_t_size = 64 ;
    enum  max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3) ;

    //  Shared message buffer. Message data are either allocated in one
    //  continuous block along with this structure - thus avoiding one
    //  malloc/free pair or they are stored in used-supplied memory.
    //  In the latter case, ffn member stores pointer to the function to be
    //  used to deallocate the data. If the buffer is actually shared (there
    //  are at least 2 references to it) refcount member contains number of
    //  references.
    struct content_t
    
        void *data;
        size_t size;
        msg_free_fn *ffn;
        void *hint;
        zmq::atomic_counter_t refcnt;
    ;

    //  Different message types.
    enum type_t
    
        type_min = 101,
        //  VSM messages store the content in the message itself
        type_vsm = 101,
        //  LMSG messages store the content in malloc-ed memory
        type_lmsg = 102,
        //  Delimiter messages are used in envelopes
        type_delimiter = 103,
        //  CMSG messages point to constant data
        type_cmsg = 104,
        type_max = 104
    ;

    // the file descriptor where this message originated, needs to be 64bit due to alignment
    int64_t file_desc;

    //  Note that fields shared between different message types are not
    //  moved to the parent class (msg_t). This way we get tighter packing
    //  of the data. Shared fields can be accessed via 'base' member of
    //  the union.
    union 
        struct 
            metadata_t *metadata;
            unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
            unsigned char type;
            unsigned char flags;
         base;
        struct 
            metadata_t *metadata;
            unsigned char data [max_vsm_size];
            unsigned char size;
            unsigned char type;
            unsigned char flags;
         vsm;
        struct 
            metadata_t *metadata;
            content_t *content;
            unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2)];
            unsigned char type;
            unsigned char flags;
         lmsg;
        struct 
            metadata_t *metadata;
            void* data;
            size_t size;
            unsigned char unused
                [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2)];
            unsigned char type;
            unsigned char flags;
         cmsg;
        struct 
            metadata_t *metadata;
            unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
            unsigned char type;
            unsigned char flags;
         delimiter;
     u;
; 

msg的结构由五种,存在一个union 结构中,为了保持内存对其,每一个结构体都是64字节,不足的用unused数组对其,struct中的metadata_t 是zmq4.x中新加入的结构,指向一个属性字典,主要存放dentity,socket_type等属性,这些数据实在握手时互相发送的,每一个接收到的msg都会设置metadata属性,记录通讯的另一方的信息。五种msg中比较通用的是vsm和lmsg,前者用于短消息,后者用于长消息。短消息的大小为 max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3),超过max_vsm_size作为长消息处理。短消息的复制比较简单,直接拷贝数据即可。但是为了避免大数据的拷贝,长消息会通过在flag中添加msg_t::shared标记和在content中比较refcnt来进行复制,比较类似于只能指针,下面是msg的一些基本操作:

int zmq::msg_t::close ()

    //  Check the validity of the message.
    if (unlikely (!check ())) 
        errno = EFAULT;
        return -1;
    

    if (u.base.type == type_lmsg) 

        //  If the content is not shared, or if it is shared and the reference
        //  count has dropped to zero, deallocate it.
        if (!(u.lmsg.flags & msg_t::shared) ||
              !u.lmsg.content->refcnt.sub (1)) 

            //  We used "placement new" operator to initialize the reference
            //  counter so we call the destructor explicitly now.
            u.lmsg.content->refcnt.~atomic_counter_t ();

            if (u.lmsg.content->ffn)
                u.lmsg.content->ffn (u.lmsg.content->data,
                    u.lmsg.content->hint);
            free (u.lmsg.content);
        
    

    if (u.base.metadata != NULL)
        if (u.base.metadata->drop_ref ())
            delete u.base.metadata;

    //  Make the message invalid.
    u.base.type = 0;

    return 0;


int zmq::msg_t::move (msg_t &src_)

    //  Check the validity of the source.
    if (unlikely (!src_.check ())) 
        errno = EFAULT;
        return -1;
    

    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    *this = src_;

    rc = src_.init ();
    if (unlikely (rc < 0))
        return rc;

    return 0;


int zmq::msg_t::copy (msg_t &src_)

    //  Check the validity of the source.
    if (unlikely (!src_.check ())) 
        errno = EFAULT;
        return -1;
    

    int rc = close ();
    if (unlikely (rc < 0))
        return rc;

    if (src_.u.base.type == type_lmsg) 

        //  One reference is added to shared messages. Non-shared messages
        //  are turned into shared messages and reference count is set to 2.
        if (src_.u.lmsg.flags & msg_t::shared)
            src_.u.lmsg.content->refcnt.add (1);
        else 
            src_.u.lmsg.flags |= msg_t::shared;
            src_.u.lmsg.content->refcnt.set (2);
        
    

    if (src_.u.base.metadata != NULL)
        src_.u.base.metadata->add_ref ();

    *this = src_;

    return 0;

在close中,如果一条msg的flag是lmsg并且flag中没有shared属性或者当前的引用为0,则需要销毁content。需要注意的是 在创建lmsg时,content中的refcnt使用了placement new,所以需要显示调用atomic_counter的析构函数,即.lmsg.content->refcnt.~atomic_counter_t (); move是将一个对象转移到另一个对象,同事将自身重置,所以不会影响lmsg的引用次数,可以直接复制。拷贝构造函数则是构造一个新的的msg,但是旧的msg不会销毁,所以在复制之前需要增加flag和refcnt。

以上是关于ZMQ源码分析--MSG的主要内容,如果未能解决你的问题,请参考以下文章

ZMQ源码分析 --TCP通讯

ZMQ源码分析--对象管理和消息机制

ZMQ源码分析 --进程内通讯

ZMQ源码分析--其他socket_base模型

ZMQ源码分析-- 网络&线程模型

ZMQ源码分析--编码器和解码器