ZeroMQ 在多线程应用程序中处理中断

Posted

技术标签:

【中文标题】ZeroMQ 在多线程应用程序中处理中断【英文标题】:ZeroMQ handling interrupt in multithreaded application 【发布时间】:2019-04-04 05:20:20 【问题描述】:

ZeroMQ 在多线程环境中的优雅退出

规格:ubuntu 16.04 和 c++11,libzmq:4.2.3

示例代码

static int s_interrupted = 0;
static void s_signal_handler (int signal_value)

    s_interrupted = 1;
    //some code which will tell main thread to exit


static void s_catch_signals (void)

    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);


static void Thread((zsock_t *pipe, void *)

    zmq::context_t context(1);
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    requester1.connect(address1);
    requester2.connect(address2);
    zmq_pollitem_t items []=
        requester1,0,ZMQ_POLLIN,0,
        requester2,0,ZMQ_POLLIN,0;

    while(true)
    
        zmq::message_t message;
        zmq::poll (items, 2, -1);

         if (items [0].revents & ZMQ_POLLIN) 
         
             requester1.recv(&message);
         
         if (items [1].revents & ZMQ_POLLIN) 
         
             requester2.recv(&message);
         
    


int main()

    .
    //some code
    .

    zactor_t *actor = zactor_new (Threaded, nullptr);
    s_catch_signals();
    .
    //continue
    .
    //wait till thread finishes to exit

    return 0;

现在,当中断发生时,它将从主线程调用信号处理程序。我不知何故需要告诉线程(轮询器)从信号处理程序中退出。任何想法如何实现这一目标?

【问题讨论】:

【参考方案1】:

从 ZMQ 文档中,您有 2 种“惯用”方式来处理这个问题:

Polling on a pipe,并在信号处理程序的管道上写入。

Catching exception thrown in recv when a signal is sent。

经过测试,zmq::poll 似乎没有在 SIGINT 上抛出异常。 因此解决方案似乎是使用专用于关闭的套接字。 解决方案如下所示:

#include <iostream>
#include <thread>
#include <signal.h>
#include <zmq.hpp>
zmq::context_t* ctx;
static void s_signal_handler (int signal_value)

    std::cout << "Signal received" << std::endl;
    zmq::socket_t stop_socket(*ctx, ZMQ_PAIR);
    stop_socket.connect("inproc://stop_address");
    zmq::message_t msg("0", 1);
    stop_socket.send(msg);
    std::cout << "end sighandler" << std::endl;


static void s_catch_signals (void)

    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);


void thread(void)

    std::cout << "Thread Begin" << std::endl;
    zmq::context_t context (1);
    ctx = &context;
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    zmq::socket_t stop_socket(context, ZMQ_PAIR);
    requester1.connect("tcp://127.0.0.1:36483");
    requester2.connect("tcp://127.0.0.1:36483");
    stop_socket.bind("inproc://stop_address");

    zmq_pollitem_t items []=
    
        requester1,0,ZMQ_POLLIN,0,
        requester2,0,ZMQ_POLLIN,0,
        stop_socket,0,ZMQ_POLLIN,0
    ;

    while ( true )
    
        //  Blocking read will throw on a signal

        int rc = 0;
        std::cout << "Polling" << std::endl;
        rc = zmq::poll (items, 3, -1);

        zmq::message_t message;
        if(rc > 0)
        
            if (items [0].revents & ZMQ_POLLIN)
            
                requester1.recv(&message);
            
            if (items [1].revents & ZMQ_POLLIN)
            
                requester2.recv(&message);
            
            if(items [2].revents & ZMQ_POLLIN)
            
                std::cout << "message stop received " << std::endl;
                break;
            
        
    
    requester1.setsockopt(ZMQ_LINGER, 0);
    requester2.setsockopt(ZMQ_LINGER, 0);
    stop_socket.setsockopt(ZMQ_LINGER, 0);
    requester1.close();
    requester2.close();
    stop_socket.close();
    std::cout << "Thread end" << std::endl;


int main(void)

    std::cout << "Begin" << std::endl;
    s_catch_signals ();
    zmq::context_t context (1);
    zmq::socket_t router(context,ZMQ_ROUTER);
    router.bind("tcp://127.0.0.1:36483");

    std::thread t(&thread);
    t.join();
    std::cout << "end join" << std::endl;


请注意,如果您不想将上下文共享给信号处理程序,您可以使用“ipc://...”。

【讨论】:

请注意,如果您不想将上下文共享给信号处理程序,您可以使用 "ipc://..." 。这个我不太明白 ZMQ中用作地址的字符串以协议开头,后跟地址。例如tcp://127.0.0.7:1234 用于 TCP 协议到 ip 地址 127.0.0.1 在端口 1234。或者您可以使用ipc://name 用于进程间协议或inproc://anystring 用于线程间协议。使用 inproc 时,两个套接字需要具有相同的上下文。使用 ipc 时,不需要使用相同的上下文。因此,如果您不希望您的信号处理程序能够访问zmq::context_t,那么您可以将inproc:// 更改为ipc:// 这里是 ZMQ INPROC、IPC、TCP、PGM 的不同传输层的文档链接【参考方案2】:

如果您希望在处理信号时保留 ZMQ 的 Actor 模型的感觉,您可以使用 Linux 上的 signalfd 接口:signalfd manpage。这样,您可以使用 zmq poll 来等待信号被传递,而不是使用信号处理程序。

它还有一个额外的优势,即在处理通过文件描述符传递的信号时,您可以调用任何您喜欢的函数,因为您是同步处理它,而不是异步处理它。

【讨论】:

我会调查的

以上是关于ZeroMQ 在多线程应用程序中处理中断的主要内容,如果未能解决你的问题,请参考以下文章

使用 zeroMQ 在 PHP 中进行并行处理

在多线程 C++11 程序中未处理异常时会发生啥?

在多线程 C++11 程序中未处理异常时会发生啥?

Java遨游在多线程的知识体系中

Java遨游在多线程的知识体系中

OKHTTP Singleton 对象在多线程系统中处理不同的 API 调用