ZMQ源码分析--MSG
Posted 子曰帅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZMQ源码分析--MSG相关的知识,希望对你有一定的参考价值。
MSG
msg类是zmq中消息的定义类,tcp数据传送,进程间数据传送,进程内数据传送都是以msg为基本单位的。由于msg经常需要在对象之间传送,所以如何能尽量避免拷贝是msg需要解决的问题。
class msg_t
public:
// Message flags.
enum
more = 1, // Followed by more parts
command = 2, // Command frame (see ZMTP spec)
credential = 32,
identity = 64,
shared = 128
;
bool check ();
int init ();
int init_size (size_t size_);
int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_);
int init_delimiter ();
int close ();
int move (msg_t &src_);
int copy (msg_t &src_);
void *data ();
size_t size ();
unsigned char flags ();
void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_);
int64_t fd ();
void set_fd (int64_t fd_);
metadata_t *metadata () const;
void set_metadata (metadata_t *metadata_);
void reset_metadata ();
bool is_identity () const;
bool is_credential () const;
bool is_delimiter () const;
bool is_vsm ();
bool is_cmsg ();
// After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy.
void add_refs (int refs_);
// Removes references previously added by add_refs. If the number of
// references drops to 0, the message is closed and false is returned.
bool rm_refs (int refs_);
private:
// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
enum msg_t_size = 64 ;
enum max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3) ;
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
;
// Different message types.
enum type_t
type_min = 101,
// VSM messages store the content in the message itself
type_vsm = 101,
// LMSG messages store the content in malloc-ed memory
type_lmsg = 102,
// Delimiter messages are used in envelopes
type_delimiter = 103,
// CMSG messages point to constant data
type_cmsg = 104,
type_max = 104
;
// the file descriptor where this message originated, needs to be 64bit due to alignment
int64_t file_desc;
// Note that fields shared between different message types are not
// moved to the parent class (msg_t). This way we get tighter packing
// of the data. Shared fields can be accessed via 'base' member of
// the union.
union
struct
metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
unsigned char type;
unsigned char flags;
base;
struct
metadata_t *metadata;
unsigned char data [max_vsm_size];
unsigned char size;
unsigned char type;
unsigned char flags;
vsm;
struct
metadata_t *metadata;
content_t *content;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2)];
unsigned char type;
unsigned char flags;
lmsg;
struct
metadata_t *metadata;
void* data;
size_t size;
unsigned char unused
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2)];
unsigned char type;
unsigned char flags;
cmsg;
struct
metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
unsigned char type;
unsigned char flags;
delimiter;
u;
;
msg的结构由五种,存在一个union 结构中,为了保持内存对其,每一个结构体都是64字节,不足的用unused数组对其,struct中的metadata_t 是zmq4.x中新加入的结构,指向一个属性字典,主要存放dentity,socket_type等属性,这些数据实在握手时互相发送的,每一个接收到的msg都会设置metadata属性,记录通讯的另一方的信息。五种msg中比较通用的是vsm和lmsg,前者用于短消息,后者用于长消息。短消息的大小为 max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3),超过max_vsm_size作为长消息处理。短消息的复制比较简单,直接拷贝数据即可。但是为了避免大数据的拷贝,长消息会通过在flag中添加msg_t::shared标记和在content中比较refcnt来进行复制,比较类似于只能指针,下面是msg的一些基本操作:
int zmq::msg_t::close ()
// Check the validity of the message.
if (unlikely (!check ()))
errno = EFAULT;
return -1;
if (u.base.type == type_lmsg)
// If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it.
if (!(u.lmsg.flags & msg_t::shared) ||
!u.lmsg.content->refcnt.sub (1))
// We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now.
u.lmsg.content->refcnt.~atomic_counter_t ();
if (u.lmsg.content->ffn)
u.lmsg.content->ffn (u.lmsg.content->data,
u.lmsg.content->hint);
free (u.lmsg.content);
if (u.base.metadata != NULL)
if (u.base.metadata->drop_ref ())
delete u.base.metadata;
// Make the message invalid.
u.base.type = 0;
return 0;
int zmq::msg_t::move (msg_t &src_)
// Check the validity of the source.
if (unlikely (!src_.check ()))
errno = EFAULT;
return -1;
int rc = close ();
if (unlikely (rc < 0))
return rc;
*this = src_;
rc = src_.init ();
if (unlikely (rc < 0))
return rc;
return 0;
int zmq::msg_t::copy (msg_t &src_)
// Check the validity of the source.
if (unlikely (!src_.check ()))
errno = EFAULT;
return -1;
int rc = close ();
if (unlikely (rc < 0))
return rc;
if (src_.u.base.type == type_lmsg)
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
if (src_.u.lmsg.flags & msg_t::shared)
src_.u.lmsg.content->refcnt.add (1);
else
src_.u.lmsg.flags |= msg_t::shared;
src_.u.lmsg.content->refcnt.set (2);
if (src_.u.base.metadata != NULL)
src_.u.base.metadata->add_ref ();
*this = src_;
return 0;
在close中,如果一条msg的flag是lmsg并且flag中没有shared属性或者当前的引用为0,则需要销毁content。需要注意的是 在创建lmsg时,content中的refcnt使用了placement new,所以需要显示调用atomic_counter的析构函数,即.lmsg.content->refcnt.~atomic_counter_t (); move是将一个对象转移到另一个对象,同事将自身重置,所以不会影响lmsg的引用次数,可以直接复制。拷贝构造函数则是构造一个新的的msg,但是旧的msg不会销毁,所以在复制之前需要增加flag和refcnt。
以上是关于ZMQ源码分析--MSG的主要内容,如果未能解决你的问题,请参考以下文章