boost::interprocess::message_queue 必须由写入它的进程创建?

Posted

技术标签:

【中文标题】boost::interprocess::message_queue 必须由写入它的进程创建?【英文标题】:boost::interprocess::message_queue has to be created by the process which writes to it? 【发布时间】:2015-05-05 09:59:59 【问题描述】:

我有两个进程使用相同的代码相互通信,具体取决于 boost 1.58.0 的 boost::interprocess::message_queue。

typedef boost::shared_ptr<boost::interprocess::message_queue> mq_ptr;

mq_ptr m_t2b_queue;
mq_ptr m_b2t_queue;


sink::sink(const char * conn_id, bool is_parent)

    const int MAX_MESSAGE_NUMBER = 100000;


    snprintf( m_t2b_queue_name, MAX_MQ_NAME_LEN, "%s_t2b", conn_id);
    snprintf( m_b2t_queue_name, MAX_MQ_NAME_LEN, "%s_b2t", conn_id);

    if( is_parent )
    
        message_queue::remove("ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/);
        message_queue::remove("ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/);

        permissions perm;
        perm.set_unrestricted();
        m_t2b_queue.reset(new message_queue(create_only, "ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/, MAX_MESSAGE_NUMBER, 24 /*sizeof(mq_item_t)*/, perm));
        m_b2t_queue.reset(new message_queue(create_only, "ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/, MAX_MESSAGE_NUMBER, 24 /*sizeof(mq_item_t)*/, perm));
    
    else
    
        m_t2b_queue.reset(new message_queue(open_only, "ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/));
        m_b2t_queue.reset(new message_queue(open_only, "ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/));
        printf( "t2b max msg size = %d\n", m_t2b_queue->get_max_msg_size() );
        printf( "b2t max msg size = %d\n", m_b2t_queue->get_max_msg_size() );
        

上面代码中的变量已经替换为硬编码值,这样更清楚

父进程创建像这样的sink实例

sink parent( "ffqs5hbKgFs", true);

这样我假设父进程在内部创建了 2 个 message_queue。一个用于读取/另一个用于写入。

然后父进程创建子进程,子进程创建像这样的接收器实例

sink child( "ffqs5hbKgFs", false);

我想子进程会打开现有的 2 个由父进程创建的 message_queue。并使用其中之一向父进程发送消息。

问题是,子进程中的message_queue打开成功,但max_msg_size为零。

t2b max msg size = 0
b2t max msg size = 0

当 clild 进程尝试通过 message_queue 发送消息时,这会导致错误。

问题:这是否意味着message_queue 必须由写入它的进程创建?

在我的场景中,我希望 message_queue 始终由父进程创建,因此当子进程崩溃时,可以将新的子进程附加到现有的 message_queue 以恢复工作。


原因:终于找到原因了,一个进程以x86运行。另一个以 x64 运行

【问题讨论】:

您是否 100% 确定父代码在子代码之前运行? 是的,我 100% 确定。我通过调试和日志跟踪代码,在创建message_queue 之后创建子进程。此外,根据open_only 的 boost 文档,“打开一个先前创建的进程共享消息队列,名称为 'name'。如果队列之前没有创建或没有可用资源,则会引发错误。”这意味着如果 message_queue不存在,打开会立即失败。 你为什么对代码进行如此微不足道的修改?正如我的回答一样,您本可以做到a SSCCCE。也许我们会看到不同的解决方案/问题。 【参考方案1】:

您需要在队列创建期间同步访问。您实际上是在同步文件系统访问。

在这里你可以看到正在重现的故障:

Compiling On Coliru

int main() 
    std::cout << "Before: " << getpid() << "\n";

    if (int child = fork()) 
        std::cout << "Parent: " << getpid() << "\n";
        sink parent("ffqs5hbKgFs", true);

        int status;
        waitpid(child, &status, 0);
     else 
        std::cout << "Child: " << getpid() << "\n";
        sink parent("ffqs5hbKgFs", false);
    

打印

terminate called after throwing an instance of 'boost::interprocess::interprocess_exception'
what():  No such file or directory

当然,一个简单的睡眠会证明这是一个竞争条件(你期望什么,孩子将打开队列,而父母只是 remove-ing 他们):

 else 
    std::cout << "Child: " << getpid() << "\n";
    sleep(1); // one second
    sink parent("ffqs5hbKgFs", false);

打印,例如:

Before: 3318
Child: 3319
t2b max msg size = 24
b2t max msg size = 24
Parent: 3318

进一步阅读

有关正确的同步解决方案,请参阅the docs:

如前所述,如果无法有效同步对内存的访问,则通过内存映射文件或共享内存对象在进程之间共享内存的能力不是很有用。这与线程同步机制发生的问题相同,其中堆内存和全局变量在线程之间共享,但是对这些资源的访问通常需要通过互斥锁和条件变量进行同步。 Boost.Threads 在同一进程内的线程之间实现这些同步实用程序。 Boost.Interprocess 实现了类似的机制来同步来自不同进程的线程

完整演示

Compiling On Coliru

#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <mqueue.h>

namespace bip = boost::interprocess;

typedef boost::shared_ptr<boost::interprocess::message_queue> mq_ptr;

struct mq_item_t 
    char data[24];
;

struct sink 
    std::string m_t2b_queue_name;
    std::string m_b2t_queue_name;
    mq_ptr m_t2b_queue;
    mq_ptr m_b2t_queue;

    sink(const char * conn_id, bool is_parent)
        : 
        m_t2b_queue_name(conn_id + std::string("_t2b")),
        m_b2t_queue_name(conn_id + std::string("_b2t"))
    
        const int MAX_MESSAGE_NUMBER = 100000;

        if( is_parent )
        
            bip::message_queue::remove(m_t2b_queue_name.c_str());
            bip::message_queue::remove(m_b2t_queue_name.c_str());

            bip::permissions perm;
            perm.set_unrestricted();
            m_t2b_queue.reset(new bip::message_queue(bip::create_only, m_t2b_queue_name.c_str(), MAX_MESSAGE_NUMBER, sizeof(mq_item_t), perm));
            m_b2t_queue.reset(new bip::message_queue(bip::create_only, m_b2t_queue_name.c_str(), MAX_MESSAGE_NUMBER, sizeof(mq_item_t), perm));
        
        else
        
            m_t2b_queue.reset(new bip::message_queue(bip::open_only, m_t2b_queue_name.c_str()));
            m_b2t_queue.reset(new bip::message_queue(bip::open_only, m_b2t_queue_name.c_str()));
            std::cout << "t2b max msg size = " << m_t2b_queue->get_max_msg_size() << "\n";
            std::cout << "b2t max msg size = " << m_b2t_queue->get_max_msg_size() << "\n";
            
    
;

#include <sys/types.h>
#include <sys/wait.h>

int main() 
    std::cout << "Before: " << getpid() << "\n";

    if (int child = fork()) 
        std::cout << "Parent: " << getpid() << "\n";
        sink parent("ffqs5hbKgFs", true);

        int status;
        waitpid(child, &status, 0);
     else 
        std::cout << "Child: " << getpid() << "\n";
        sleep(1); // one second
        sink parent("ffqs5hbKgFs", false);
    

【讨论】:

@user325320 时间来发布您的 SSCCE。我们只能猜测。

以上是关于boost::interprocess::message_queue 必须由写入它的进程创建?的主要内容,如果未能解决你的问题,请参考以下文章