ZMQ 扩展请求回复中的 EAGAIN

Posted

技术标签:

【中文标题】ZMQ 扩展请求回复中的 EAGAIN【英文标题】:EAGAIN in ZMQ extended request reply 【发布时间】:2018-04-05 06:26:08 【问题描述】:

我正在尝试在 C++ 中创建一个 REQ 路由器 经销商 REP 通信。子进程绑定router和dealer,在router和dealer之间做代理,将REP连接到dealer,等待zmq_recv消息。

父进程将 REQ 连接到路由器并尝试发送消息,但是我收到了 zmq_send error in parent: Resource temporarily unavailable(即 EAGAIN)。根据zmq_send docs,EAGAIN 表示:

请求了非阻塞模式,目前无法发送消息。

但是消息确实被发送,因为它是在子进程中收到的。为什么会返回那个errno?

这里是MCVE:

#include <zmq.h>
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <assert.h>
#include <thread>
#include <stdio.h>

int main() 
  char connect_path[35];
  int rc;
  int msg;
  pid_t child_pid = fork();

  if (child_pid == 0) 
    // Child
    void* child_context = zmq_ctx_new ();
    if (child_context == NULL) 
      std::cerr << "\nChild context error\n";
    

    void* router = zmq_socket(child_context, ZMQ_ROUTER);
    if (router == NULL) 
      perror("zmq_socket of type router error");
    
    char bind_path[35];

    snprintf(bind_path, sizeof(bind_path), "ipc:///tmp/zmqtest%d-router", getpid());
    rc = zmq_bind(router, bind_path);
    assert (rc == 0);

    void* dealer = zmq_socket(child_context, ZMQ_DEALER);
    if (dealer == NULL) 
      perror("zmq_socket of type dealer error");
    

    snprintf(bind_path, sizeof(bind_path), "ipc:///tmp/zmqtest%d-dealer", getpid());
    rc = zmq_bind(dealer, bind_path);
    assert (rc == 0);

    std::thread z_proxy (zmq_proxy, router, dealer, nullptr);
    z_proxy.detach();

    void* rep_socket = zmq_socket (child_context, ZMQ_REP);
    if (rep_socket == NULL) 
      perror("zmq_socket of type rep error");
    

    snprintf(connect_path, sizeof(connect_path), "ipc:///tmp/zmqtest%d-dealer", getpid());
    rc = zmq_connect(rep_socket, connect_path);
    assert (rc == 0);

    while(1) 
      if (zmq_recv (rep_socket, &msg, sizeof(msg), 0) != 0) 
        perror("zmq_recv error");
      
      printf("\nReceived msg %d in process %d\n", msg, getpid());
      break;
    
    if (zmq_close(rep_socket) != 0) 
      perror("zmq_close of rep_socket in child error");
    
    if (zmq_ctx_term(child_context) != 0) 
      perror("zmq_ctx_term of child_context error");
    
   else 
    // Parent
    sleep(1);

    void* parent_context = zmq_ctx_new ();
    if (parent_context == NULL) 
      std::cerr << "\nParent ctx error\n";
    

    void* req_socket = zmq_socket (parent_context, ZMQ_REQ);
    if (req_socket == NULL) 
      perror("zmq_socket of type req error in parent");
    

    snprintf(connect_path, sizeof(connect_path), "ipc:///tmp/zmqtest%d-router", child_pid);
    rc = zmq_connect(req_socket, connect_path);
    assert (rc == 0);

    msg = 30;
    if (zmq_send (req_socket, &msg, sizeof(msg), 0) != 0) 
      perror("zmq_send error in parent");
    

    if (zmq_close(req_socket) != 0) 
      perror("zmq_close of req_socket in parent error");
    
    if (zmq_ctx_term(parent_context) != 0) 
      perror("zmq_ctx_term of parent_context error");
    
  

【问题讨论】:

【参考方案1】:

第 1 步:做一个简单的测试:

嗯,至少应该先进行这种测试排队:

 rc =           zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT );
 printf ( "INF: zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT )\nZMQ: returned rc == %d\nZMQ: zmq_errno ~ %s\n",
           rc,
           zmq_strerror ( zmq_errno() )
           );

.


第 2 步:发布打印输出

接下来,如果有任何“错过”的镜头,错误分析可能会就潜在原因提出建议 (当且仅当 parent_ctx 确实拒绝甚至接受来自最简单的 zmq_send() 调用的数据进入其内部排队设施有明确的这样做的理由 )。

否则我们一无所知(ZMQ_DONTWAIT 标志不是这里的原因)。

作为测试was run, it yielded:

INF: zmq_send ( req_socket, 'A_TEST_BLOCK', 12, ZMQ_DONTWAIT )ZMQ: returned rc == 12ZMQ: zmq_errno ~ Resource temporarily unavailable


第三步:

测试已确认,根据文档:

zmq_send() 函数将返回消息中的字节数如果成功

那么,让我们再深入一点:

int major, minor, patch;

zmq_version ( &major, &minor, &patch );
printf ( "INF: current ØMQ version is %d.%d.%d\nZMQ: zmq_errno ~ %s\n",
          major, minor, patch,
          zmq_strerror ( zmq_errno() )
          );

第四步:

如果最新的 API 更新不符合已发布的 API 规范,请记录事件:

printf ( "EXPECT( NO ERROR, ON START ): zmq_errno ~ %s\n",
          zmq_strerror ( zmq_errno() )
          );

printf ( "EXPECT( <major>.<minor>.<patch> ): zmq_version ~\n" );

int major, minor, patch
zmq_version ( &major, &minor, &patch );

printf ( "INF: current ØMQ version is %d.%d.%d\nZMQ: zmq_errno ~ %s\n",
          major, minor, patch
          )

printf ( "EXPECT( NO ERROR ): zmq_errno ~ %s\n",
          zmq_strerror ( zmq_errno() )
          );

printf ( "EXPECT( NO ERROR ): zmq_send() ~ %s\n" );

rc =           zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT );
printf ( "INF: zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT )\nZMQ: returned rc == %d which ouhgt be == 12, is it?\n",
       rc
       );

printf ( "EXPECT( NO ERROR ): zmq_errno ~ %s\n",
          zmq_strerror ( zmq_errno() )
          );

如果出现意外结果,请随时提出问题。

【讨论】:

我收到INF: zmq_send ( req_socket, 'A_TEST_BLOCK', 12, ZMQ_DONTWAIT ) ZMQ: returned rc == 12 ZMQ: zmq_errno ~ Resource temporarily unavailable INF: current ØMQ version is 4.2.3 我意识到我一直在添加错误类型的错误检查。谢谢你的帖子。

以上是关于ZMQ 扩展请求回复中的 EAGAIN的主要内容,如果未能解决你的问题,请参考以下文章

ZMQ请求应答模式之无中间件的可靠性--自由者模式

ZMQ中请求-应答模式的可靠性设计

saltstack系列——zmq应答模式

ZMQ之面向服务的可靠队列(管家模式)

是否有可能以某种方式同时在 ZMQ 中使用 Send/Recv(通过多线程)?

使用代理网络中的临时队列的请求/回复模式的 ActiveMQ/Camel 故障转移 - 无法发布到已删除的临时队列