ZMQ源码分析 --TCP通讯

Posted 子曰帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZMQ源码分析 --TCP通讯相关的知识,希望对你有一定的参考价值。

zmq支持tcp,inpro,ipc,pgm,epgm,tipc等通讯方式。只要在address中指定地址格式即可透明支持对应的通讯方式。这里我们以最常用的tcp为例分析zmq数据在网络间传输的流程,这部分是zmq中最复杂也是最重的部分,在分析流程之前,先看一下tcp通讯需要用到的主要的类:

tcp_listener & tcp_connect

首先看一下tcp_listener_t 的代码:

    class tcp_listener_t : public own_t, public io_object_t
    
    public:

        tcp_listener_t (zmq::io_thread_t *io_thread_,
            zmq::socket_base_t *socket_, const options_t &options_);
        ~tcp_listener_t ();

        //  Set address to listen on.
        int set_address (const char *addr_);

        // Get the bound address for use with wildcard
        int get_address (std::string &addr_);

    private:

        //  Handlers for incoming commands.
        void process_plug ();
        void process_term (int linger_);

        //  Handlers for I/O events.
        void in_event ();

        //  Close the listening socket.
        void close ();

        //  Accept the new connection. Returns the file descriptor of the
        //  newly created connection. The function may return retired_fd
        //  if the connection was dropped while waiting in the listen backlog
        //  or was denied because of accept filters.
        fd_t accept ();

        //  Address to listen on.
        tcp_address_t address;

        //  Underlying socket.
        fd_t s;

        //  Handle corresponding to the listening socket.
        handle_t handle;

        //  Socket the listerner belongs to.
        zmq::socket_base_t *socket;

       // String representation of endpoint to bind to
        std::string endpoint;

        tcp_listener_t (const tcp_listener_t&);
        const tcp_listener_t &operator = (const tcp_listener_t&);
    ;

tcp_listener_t主要用于监听连接,tcp_listener_t继承自own_tio_object_t,继承own_t是因为他要管理子对象,通过tcp_listener_t创建的session_base_t都要由tcp_listener_t负责销毁。所有在io_thread_t中对象都要继承自io_object_t,包括之后要分析的tcp_connect,session_base,stream_engine等。当新建一个tcp_listener_t对象时,会为它选择一个对应的io_thread_t,之后tcp_listener_t将监听描述符加入到io_thread_t的poller中进行连接监听。当有新的连接时调用in_event方法:

void zmq::tcp_listener_t::in_event ()

    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
    if (fd == retired_fd) 
        socket->event_accept_failed (endpoint, zmq_errno());
        return;
    

    tune_tcp_socket (fd);
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);

    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd(fd);

    //  Create the engine object for this connection.
    stream_engine_t *engine = new (std::nothrow)
        stream_engine_t (fd, options, endpoint);
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create and launch a session object.
    session_base_t *session = session_base_t::create (io_thread, false, socket,
        options, NULL);
    errno_assert (session);
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
    socket->event_accepted (endpoint, fd);

tcp_listener_t创建的session_base_t会话不需要和tcp_listener_t在一个线程中,但是需要有tcp_listener_t负责管理销毁。

接下来是tcp_connect类:

   class tcp_connecter_t : public own_t, public io_object_t
    
    public:

        //  If 'delayed_start' is true connecter first waits for a while,
        //  then starts connection process.
        tcp_connecter_t (zmq::io_thread_t *io_thread_,
            zmq::session_base_t *session_, const options_t &options_,
            address_t *addr_, bool delayed_start_);
        ~tcp_connecter_t ();

    private:

        //  ID of the timer used to delay the reconnection.
        enum reconnect_timer_id = 1;

        //  Handlers for incoming commands.
        void process_plug ();
        void process_term (int linger_);

        //  Handlers for I/O events.
        void in_event ();
        void out_event ();
        void timer_event (int id_);

        //  Internal function to start the actual connection establishment.
        void start_connecting ();

        //  Internal function to add a reconnect timer
        void add_reconnect_timer();

        //  Internal function to return a reconnect backoff delay.
        //  Will modify the current_reconnect_ivl used for next call
        //  Returns the currently used interval
        int get_new_reconnect_ivl ();

        //  Open TCP connecting socket. Returns -1 in case of error,
        //  0 if connect was successfull immediately. Returns -1 with
        //  EAGAIN errno if async connect was launched.
        int open ();

        //  Close the connecting socket.
        void close ();

        //  Get the file descriptor of newly created connection. Returns
        //  retired_fd if the connection was unsuccessfull.
        fd_t connect ();

        //  Address to connect to. Owned by session_base_t.
        address_t *addr;

        //  Underlying socket.
        fd_t s;

        //  Handle corresponding to the listening socket.
        handle_t handle;

        //  If true file descriptor is registered with the poller and 'handle'
        //  contains valid value.
        bool handle_valid;

        //  If true, connecter is waiting a while before trying to connect.
        const bool delayed_start;

        //  True iff a timer has been started.
        bool timer_started;

        //  Reference to the session we belong to.
        zmq::session_base_t *session;

        //  Current reconnect ivl, updated for backoff strategy
        int current_reconnect_ivl;

        // String representation of endpoint to connect to
        std::string endpoint;

        // Socket
        zmq::socket_base_t *socket;

        tcp_connecter_t (const tcp_connecter_t&);
        const tcp_connecter_t &operator = (const tcp_connecter_t&);
    ;

tcp_connecter和tcp_listerner相反,他是由session_base_t创建的并负责管理销毁的,tcp_connecter_t也不会一直存在,当连接成功之后就会销毁:

void zmq::tcp_connecter_t::out_event ()

    rm_fd (handle);
    handle_valid = false;

    const fd_t fd = connect ();
    //  Handle the error condition by attempt to reconnect.
    if (fd == retired_fd) 
        close ();
        add_reconnect_timer ();
        return;
    

    tune_tcp_socket (fd);
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);

    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd (fd);

    //  Create the engine object for this connection.
    stream_engine_t *engine = new (std::nothrow)
        stream_engine_t (fd, options, endpoint);
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
    send_attach (session, engine);

    //  Shut the connecter down.
    terminate ();

    socket->event_connected (endpoint, fd);

当tcp_connecter连接失败时会设定一个定时器以便重新连接,这就使zmq支持在lbind之前进行connect依旧是可以成功的。

session_base

每一条tcp连接都需要一对应的session_base(inproc连接不需要,socket_base互相直接连接)。session_base是stream_engine和socket_base之间的纽带,他和socket_base之间有一个pipe_t进行连接,当socket_base需要发出一条数据的时候就把msg写入out管道,之后session_base通过stream_engine发送出去;当stream_engine读取到msg时session_base会把数据写入到session_base的in管道。

    class session_base_t : public own_t,public io_object_t,public i_pipe_events
    
    public:

        //  Create a session of the particular type.
        static session_base_t *create (zmq::io_thread_t *io_thread_,
            bool active_, zmq::socket_base_t *socket_,
            const options_t &options_, address_t *addr_);

        //  To be used once only, when creating the session.
        void attach_pipe (zmq::pipe_t *pipe_);

        //  Following functions are the interface exposed towards the engine.
        virtual void reset ();
        void flush ();
        void engine_error (zmq::stream_engine_t::error_reason_t reason);

        //  i_pipe_events interface implementation.
        void read_activated (zmq::pipe_t *pipe_);
        void write_activated (zmq::pipe_t *pipe_);
        void hiccuped (zmq::pipe_t *pipe_);
        void pipe_terminated (zmq::pipe_t *pipe_);

        //  Delivers a message. Returns 0 if successful; -1 otherwise.
        //  The function takes ownership of the message.
        int push_msg (msg_t *msg_);

        int zap_connect ();
        bool zap_enabled ();

        //  Fetches a message. Returns 0 if successful; -1 otherwise.
        //  The caller is responsible for freeing the message when no
        //  longer used.
        int pull_msg (msg_t *msg_);

        //  Receives message from ZAP socket.
        //  Returns 0 on success; -1 otherwise.
        //  The caller is responsible for freeing the message.
        int read_zap_msg (msg_t *msg_);

        //  Sends message to ZAP socket.
        //  Returns 0 on success; -1 otherwise.
        //  The function takes ownership of the message.
        int write_zap_msg (msg_t *msg_);

        socket_base_t *get_socket ();

    protected:

        session_base_t (zmq::io_thread_t *io_thread_, bool active_,
            zmq::socket_base_t *socket_, const options_t &options_,
            address_t *addr_);
        virtual ~session_base_t ();

    private:

        void start_connecting (bool wait_);

        void reconnect ();

        //  Handlers for incoming commands.
        void process_plug ();
        void process_attach (zmq::i_engine *engine_);
        void process_term (int linger_);

        //  i_poll_events handlers.
        void timer_event (int id_);

        //  Remove any half processed messages. Flush unflushed messages.
        //  Call this function when engine disconnect to get rid of leftovers.
        void clean_pipes ();

        //  If true, this session (re)connects to the peer. Otherwise, it's
        //  a transient session created by the listener.
        const bool active;

        //  Pipe connecting the session to its socket.
        zmq::pipe_t *pipe;

        //  Pipe used to exchange messages with ZAP socket.
        zmq::pipe_t *zap_pipe;

        //  This set is added to with pipes we are disconnecting, but haven't yet completed
        std::set <pipe_t *> terminating_pipes;

        //  This flag is true if the remainder of the message being processed
        //  is still in the in pipe.
        bool incomplete_in;

        //  True if termination have been suspended to push the pending
        //  messages to the network.
        bool pending;

        //  The protocol I/O engine connected to the session.
        zmq::i_engine *engine;

        //  The socket the session belongs to.
        zmq::socket_base_t *socket;

        //  I/O thread the session is living in. It will be used to plug in
        //  the engines into the same thread.
        zmq::io_thread_t *io_thread;

        //  ID of the linger timer
        enum linger_timer_id = 0x20;

        //  True is linger timer is running.
        bool has_linger_timer;

        //  Protocol and address to use when connecting.
        address_t *addr;

        session_base_t (const session_base_t&);
        const session_base_t &operator = (const session_base_t&);
    ;

session_base_t有一个变量active,它用来标记是否在process_plug中进行connecting操作,start_connecting操作中主要是建立一个tcp_connecter_t 并挂载tcp_connecter_t 作为自己的子对象。

void zmq::session_base_t::process_plug ()

    if (active)
        start_connecting (false);

void zmq::session_base_t::start_connecting (bool wait_)

    zmq_assert (active);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

    if (addr->protocol == "tcp") 
        if (!options.socks_proxy_address.empty()) 
            address_t *proxy_address = new (std::nothrow)
                address_t ("tcp", options.socks_proxy_address);
            alloc_assert (proxy_address);
            socks_connecter_t *connecter =
                new (std::nothrow) socks_connecter_t (
                    io_thread, this, options, addr, proxy_address, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        
        else 
            tcp_connecter_t *connecter = new (std::nothrow)
                tcp_connecter_t (io_thread, this, options, addr, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        
        return;
    
    //其他协议的对应处理代码省略
    .......

之前说过,session_base 和socket_base_t之间有一条传送msg的管道,这个管道是在process_attach的时候建立的,但是如果socket_base进行connect操作,并且制定了option的immediate为非1,则在socket_base_t的connect中直接建立管道。

void zmq::session_base_t::process_attach (i_engine *engine_)

    zmq_assert (engine_ != NULL);

    //  Create the pipe if it does not exist yet.
    if (!pipe && !is_terminating ()) 
        object_t *parents [2] = this, socket;
        pipe_t *pipes [2] = NULL, NULL;

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = conflate? -1 : options.rcvhwm,
            conflate? -1 : options.sndhwm;
        bool conflates [2] = conflate, conflate;
        int rc = pipepair (parents, pipes, hwms, conflates);
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
        zmq_assert (!pipe);
        pipe = pipes [0];

        //  Ask socket to plug into the remote end of the pipe.
        send_bind (socket, pipes [1]);
    

    //  Plug in the engine.
    zmq_assert (!engine);
    engine = engine_;
    engine->plug (io_thread, this);

session_base在attach_pipe 操作中会将自己设置为管道的数据事件监听对象,这样当管道读写状态发生变化时,session_base_t可以通知对应的engine。

void zmq::session_base_t::attach_pipe (pipe_t *pipe_)

    zmq_assert (!is_terminating ());
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);


void zmq::session_base_t::read_activated (pipe_t *pipe_)

    // Skip activating if we're detaching this pipe
    if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) 
        zmq_assert (terminating_pipes.count (pipe_) == 1);
        return;
    

    if (unlikely (engine == NULL)) 
        pipe->check_read ();
        return;
    

    if (likely (pipe_ == pipe))
        engine->restart_output ();
    else
        engine->zap_msg_available ();

stream_engine和session_base_t进行msg传递主要通过下面两个方法,分别是从管道中读数据给engine发送以及受到msg写入管道中。

int zmq::session_base_t::pull_msg (msg_t *msg_)

    if (!pipe || !pipe->read (msg_)) 
        errno = EAGAIN;
        return -1;
    

    incomplete_in = msg_->flags () & msg_t::more ? true : false;

    return 0;


int zmq::session_base_t::push_msg (msg_t *msg_)

    if (pipe && pipe->write (msg_)) 
        int rc = msg_->init ();
        errno_assert (rc == 0);
        return 0;
    

    errno = EAGAIN;
    return -1;

stream_engine & mechanism_t

stream_engine 是真正的和网络交互的类,他负责从网络层接受数据并发送数据到网络层。stream_engine继承自i_engine, 采用状态机模式实现,逻辑比较复杂,下面是stream_engine 的变量:

        //  Underlying socket.
        fd_t s;

        //  True iff this is server's engine.
        bool as_server;

        msg_t tx_msg; // 从session_base 中load出来的msg

        handle_t handle;

       //读入数据的buffer和对应的解析器
        unsigned char *inpos;
        size_t insize;
        i_decoder *decoder;

       // 写出数据的buffer和对应的编码器
        unsigned char *outpos;
        size_t outsize;
        i_encoder *encoder;

        //  Metadata to be attached to received messages. May be NULL.
        metadata_t *metadata;

        //  When true, we are still trying to determine whether
        //  the peer is using versioned protocol, and if so, which
        //  version.  When false, normal message flow has started.
        bool handshaking; // 连接是否属于握手阶段

        static const size_t signature_size = 10; // 签名的位置

        //  Size of ZMTP/1.0 and ZMTP/2.0 greeting message
        static const size_t v2_greeting_size = 12; //v2的握手协议大小

        //  Size of ZMTP/3.0 greeting message
        static const size_t v3_greeting_size = 64; //v3的握手协议大小

        //  Expected greeting size.
        size_t greeting_size; //当前设置的握手协议大小

        //  Greeting received from, and sent to peer
        unsigned char greeting_recv [v3_greeting_size];//握手数据接受缓存
        unsigned char greeting_send [v3_greeting_size];//握手数据发送缓存

        //  Size of greeting received so far
        unsigned int greeting_bytes_read; //当前读到的握手数据大小

        //  The session this engine is attached to.
        zmq::session_base_t *session;

        options_t options;

        // String representation of endpoint
        std::string endpoint;

        bool plugged;

        int (stream_engine_t::*next_msg) (msg_t *msg_);// 状态机相关的函数指针

        int (stream_engine_t::*process_msg) (msg_t *msg_);// 状态机相关的函数指针

        bool io_error;

        //  Indicates whether the engine is to inject a phantom
        //  subscription message into the incoming stream.
        //  Needed to support old peers.
        bool subscription_required;

        mechanism_t *mechanism; //安全机制

        //  True iff the engine couldn't consume the last decoded message.
        bool input_stopped;

        //  True iff the engine doesn't have any message to encode.
        bool output_stopped;

        //  ID of the handshake timer
        enum handshake_timer_id = 0x40;

        //  True is linger timer is running.
        bool has_handshake_timer;

        // Socket
        zmq::socket_base_t *socket;

        std::string peer_address;

stream_engine需要用到一个编码器和一个解码器以及一个安全机制的mechanism_t类来对数据编码解码以及加密,解密。在进行msg传递之前,stream_engine需要和tcp连接的另一端进行握手,这是在tcp的三次握手之后zmq应用层自己的握手协议,握手过程需要传送的数据主要包括greeting和handshake两部分:

void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)

    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
    socket = session-> get_socket ();

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
    io_error = false;

    if (options.raw_sock) 
        ........
    
    else 
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
        outpos = greeting_send;
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    

    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();

首先,在stream_engine的pulg方法中,会把outpos指向 greeting_send,greeting_send是握手协议的数据缓存,plug首先向greeting_send中写入十字节作为signature,之后会调用in_event事件,如果此时没有数据读入则等待poller的回调。在in_event中如果判断当前属于握手状态,则直接进入handshake方法:

bool zmq::stream_engine_t::handshake ()

    zmq_assert (handshaking);
    zmq_assert (greeting_bytes_read < greeting_size);
    //  Receive the greeting.
    while (greeting_bytes_read < greeting_size) 
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
        if (n == 0) 
            error (connection_error);
            return false;
        
        if (n == -1) 
            if (errno != EAGAIN)
                error (connection_error);
            return false;
        

        greeting_bytes_read += n;

        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
        if (greeting_recv [0] != 0xff)
            break;

        if (greeting_bytes_read < signature_size)
            continue;

        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
        if (!(greeting_recv [9] & 0x01))
            break;

        //  The peer is using versioned protocol.
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) 
            if (outsize == 0)
                set_pollout (handle);
            outpos [outsize++] = 3;     //  Major version number
        

        if (greeting_bytes_read > signature_size) 
            if (outpos + outsize == greeting_send + signature_size + 1) 
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else 
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);

                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
                    if (options.mechanism == ZMQ_PLAIN)
                        memcpy (outpos + outsize, "PLAIN", 5);
                    else
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
                    else
                    if (options.mechanism == ZMQ_CURVE)
                        memcpy (outpos + outsize, "CURVE", 5);
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                
            
        
    

    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;

    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) 
        if (session->zap_enabled ()) 
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        

        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);

        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
        size_t buffer_size = encoder->encode (&bufferp, header_size);
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
        inpos = greeting_recv;
        insize = greeting_bytes_read;

        //  To allow for interoperability with peers that do not forward
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
            subscription_required = true;

        //  We are sending our identity now and the next message
        //  will come from the socket.
        next_msg = &stream_engine_t::pull_msg_from_session;

        //  We are expecting identity message.
        process_msg = &stream_engine_t::process_identity_msg;
    
    else
    if (greeting_recv [revision_pos] == ZMTP_1_0) 
        if (session->zap_enabled ()) 
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        

        encoder = new (std::nothrow) v1_encoder_t (
            out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v1_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) 
        if (session->zap_enabled ()) 
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        

        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    
    else 
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);

        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0", 20) == 0) 
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
            alloc_assert (mechanism);
        
        else
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0", 20) == 0) 
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
                    plain_client_t (options);
            alloc_assert (mechanism);
        
#ifdef HAVE_LIBSODIUM
        else
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0", 20) == 0) 
            if (options.as_server)
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        
#endif
#ifdef HAVE_LIBGSSAPI_KRB5
        else
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0\\0", 20) == 0) 
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) gssapi_client_t (options);
            alloc_assert (mechanism);
        
#endif
        else 
            error (protocol_error);
            return false;
        
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
    

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

    if (has_handshake_timer) 
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    

    return true;

handshake方法首先判断第一个字节是不是0xff,如果是说明对方发送的数据是zmtp协议,则等待10字节的signature全部读入,之后在输出缓存写入3代表本方的zmtp版本为3.0。之后需要读入对方的版很好,这里zmq向前兼容,新旧版本之间可以相互通讯。假如双发都是zmtp3.0,则需要向输出缓存中写入对应的认证机制, 并且初始化v2_encoder_t 和v2_decoder_t 作为编码器和解码器。zmq4中加入了安全认证和加密机智,目前zmq提供ZMQ_NULL,ZMQ_PLAIN,ZMQ_CURVE,ZMQ_GSSAPI四种认证机制,我们之后的分析是以最简单的ZMQ_NULL为例,其他认证机制流程基本相似,只是认证方式不同。greeting在zmtp3.0中一共有64字节大小。在greeting阶段结束后,handshaking标记设为false,但是握手还没有完成。接下来的握手和之后的数据发送stream_engine都是由状态机来完成的,状态机的核心是两个指针

        int (stream_engine_t::*next_msg) (msg_t *msg_);

        int (stream_engine_t::*process_msg) (msg_t *msg_);

这两个指针分别指向能够获取下一条msg的方法和能够处理下一条msg的方法,至于获取到的msg怎么发送出去和怎样从网络获取msg会在之后分析编码器和解码器时详细分析。
在handshake方法的最后,这两个函数指针已经被设置为:

        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;

next_handshake_command的实现如下,还是使用ZMQ_NULL的认证方式为例:

int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)

    zmq_assert (mechanism != NULL);

    if (mechanism->status () == mechanism_t::ready) 
        mechanism_ready ();
        return pull_and_encode (msg_);
    
    else
    if (mechanism->status () == mechanism_t::error) 
        errno = EPROTO;
        return -1;
    
    else 
        const int rc = mechanism->next_handshake_command (msg_);
        if (rc == 0)
            msg_->set_flags (msg_t::command);
        return rc;
    

首先next_handshake_command 调用mechanism的next_handshake_command 方法获取command数据:

int zmq::null_mechanism_t::next_handshake_command (msg_t *msg_)

    if (ready_command_sent || error_command_sent) 
        errno = EAGAIN;
        return -1;

    //zap 方式的处理省略
    ....

    unsigned char *const command_buffer = (unsigned char *) malloc (512);
    alloc_assert (command_buffer);

    unsigned char *ptr = command_buffer;

    //  Add mechanism string
    memcpy (ptr, "\\5READY", 6);
    ptr += 6;

    //  Add socket type property
    const char *socket_type = socket_type_string (options.type);
    ptr += add_property (ptr, "Socket-Type", socket_type, strlen (socket_type));

    //  Add identity property
    if (options.type == ZMQ_REQ
    ||  options.type == ZMQ_DEALER
    ||  options.type == ZMQ_ROUTER)
        ptr += add_property (ptr, "Identity", options.identity, options.identity_size);

    const size_t command_size = ptr - command_buffer;
    const int rc = msg_->init_size (command_size);
    errno_assert (rc == 0);
    memcpy (msg_->data (), command_buffer, command_size);
    free (command_buffer);

    ready_command_sent = true;

    return 0;

改command类型的msg主要包括一个ready字符串和Socket-Type和Identity两种属性,next_handshake_command 获取到该msg之后则交给编码器发送出去。next_handshake_command 的下一次调用会返回-1,标记commad已经发送出去,等待对方的command协议。此时out_event没有数据处理,会调用reset_pollout (handle) 暂停写出事件,等待对方命令。之后当解码器收到一条msg时,则调用process_handshake_command:

int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)

    zmq_assert (mechanism != NULL);
    const int rc = mechanism->process_handshake_command (msg_);
    if (rc == 0) 
        if (mechanism->status () == mechanism_t::ready)
            mechanism_ready ();
        else
        if (mechanism->status () == mechanism_t::error) 
            errno = EPROTO;
            return -1;
        

        //此时可能由于在等待mechanism_t::ready状态,output已经停止写数据,并且不能用session的read_activated来激活output,因为此时session和socket之间的管道可能不是睡眠状态的,所以只能手动激活一次
        if (output_stopped)
            restart_output ();
    

    return rc;

process_handshake_command 调用mechanis 的process_handshake_command ,ZMQ_NULL机智会在该方法中校验socket_type并且保存对方的identity。当握手命令发送出去并且对方的握手命令也接受处理之后,mechanism 处于ready状态。此时无论是process_handshake_command和next_handshake_command谁先调用都会进入到mechanism_ready 方法中。这里process_handshake_command 和 next_handshake_command 没有先后顺序规定哪个必须先处理,是无状态的。

void zmq以上是关于ZMQ源码分析 --TCP通讯的主要内容,如果未能解决你的问题,请参考以下文章

ZMQ源码分析-- 基础数据结构的实现

ZMQ源码分析 --进程内通讯

ZMQ源码分析--MSG

ZMQ源码分析--其他socket_base模型

ZMQ源码分析-- 网络&线程模型

ZMQ源码分析--编码器和解码器