使用 Boost.Interprocess 使进程等待直到资源加载到共享内存中

Posted

技术标签:

【中文标题】使用 Boost.Interprocess 使进程等待直到资源加载到共享内存中【英文标题】:Making processes wait until resource is loaded into shared memory using Boost.Interprocess 【发布时间】:2021-09-23 03:26:56 【问题描述】:

假设我有 ID 为 1nn 进程。我有一个包含大量数据的文件,其中每个进程只会存储数据的不相交子集。我想只使用一个进程加载和处理文件,将生成的数据存储在通过 Boost.Interprocess 分配的共享内存中的数据结构中,然后允许任何(包括加载文件的进程)从数据中读取.

为此,我需要利用位于here 的一些Boost.Interprocess 同步结构来确保进程在加载数据之前不会尝试读取数据。但是,我在这部分苦苦挣扎,这可能是由于我在这方面缺乏经验。目前,我有process(1) 将文件加载到共享内存中,我需要一种方法来确保任何给定进程在加载完成之前都无法读取文件内容,即使读取可能在加载发生后任意长时间发生。

我想尝试使用 notify_all 调用组合使用互斥锁和条件变量,以便 process(1) 可以向其他进程发出信号,可以从共享内存数据中读取,但这似乎有一个问题是,process(1) 可能会在某些process(i) 甚至尝试wait 以指示条件变量可以读取数据之前发送notify_all 调用。

关于如何以可靠的方式解决此问题的任何想法?

编辑 1

这是我试图澄清我的困境并更清楚地表达我所尝试的内容的尝试。我有一些类,我使用 Boost.Interprocess 分配到共享内存空间中,其形式类似于以下内容:

namespace bi = boost::interprocess;

class cache 
public:
   
   cache() = default;
   ~cache() = default;

   void set_process_id(std::size_t ID)  id = ID; 

   void load_file(const std::string& filename) 
      
      // designated process to load
      // file has ID equal to 0
      if( id == 0 )
          
         // lock using the mutex
         bi::scoped_lock<bi::interprocess_mutex> lock(m);
         
         // do work to process the file and
         // place result in the data variable

         // after processing file, notify all other
         // processes that they can access the data
         load_cond.notify_all();

      


   
   void read_into(std::array<double, 100>& data_out) 
        // wait to read data until load is complete
          // lock using the mutex
          bi::scoped_lock<bi::interprocess_mutex> lock(m);
          load_cond.wait(lock);
       

       data_out = data;
   

private:
   
   size_t id;
   std::array<double, 100> data;
   bi::interprocess_mutex m;
   bi::interprocess_condition load_cond;

;

上面大致是我问这个问题时得到的,但我不太满意,因为如果在指定进程执行notify_all调用之后调用read_into方法,那么read_into会卡住.我今天早上所做的似乎解决了这个困境的方法是将这个类更改为以下内容:

namespace bi = boost::interprocess;

class cache 
public:
   
   cache():load_is_complete(false)
   ~cache() = default;

   void set_process_id(std::size_t ID)  id = ID; 

   void load_file(const std::string& filename) 
      
      // designated process to load
      // file has ID equal to 0
      if( id == 0 )
          
         // lock using the mutex
         bi::scoped_lock<bi::interprocess_mutex> lock(m);
         
         // do work to process the file and
         // place result in the data variable

         // after processing file, notify all other
         // processes that they can access the data
         load_is_complete = true;
         load_cond.notify_all();

      


   
   void read_into(std::array<double, 100>& data_out) 
        // wait to read data until load is complete
          // lock using the mutex
          bi::scoped_lock<bi::interprocess_mutex> lock(m);
          if( not load_is_complete )
             load_cond.wait(lock);
          
       

       data_out = data;
   

private:
   
   size_t id;
   std::array<double, 100> data;
   bool load_is_complete;
   bi::interprocess_mutex m;
   bi::interprocess_condition load_cond;

;

不确定上述是否最优雅,但我相信它应该确保进程在完成加载之前无法访问存储在共享内存中的数据,无论它们是在指定进程之前还是之后到达互斥锁m指定进程已加载文件内容。如果有更优雅的方式,我想知道。

【问题讨论】:

【参考方案1】:

典型的方法是使用命名的进程间互斥锁。参见例如Boris Schälings "Boost" Book 中的示例,可免费获取,也可在线获取:https://theboostcpplibraries.com/boost.interprocess-synchronization

如果您的段创建已经适当同步,您可以在共享段内使用“未命名”的进程间 mutice,这通常更有效,并且避免使用无关的同步原语污染系统命名空间。

【讨论】:

那本书参考看起来很棒,感谢您抽出宝贵时间回复。也就是说,在这种情况下仅使用互斥锁如何工作?我考虑了这一点,假设我使用了一个互斥锁,当process(1) 加载文件内容时必须锁定,并且在任何其他进程读取共享内存数据时也必须锁定,我怎么能确定process(1)将首先锁定互斥锁,以便为其他进程设置数据?似乎这不能单独工作。 我想我可能已经找到了一种方法来通过在共享内存段中添加一个条件变量和一个布尔变量来解决我的问题,但我仍然很想听到你对我上述问题的回应如果有更好的方法。 你是说我没有回答问题吗?在这种情况下,也许您可​​以澄清您缺少的部分。 我认为我在原始帖子中不够清楚,请查看我对问题的编辑,看看这是否有助于您了解我的困境是什么(以及我是否过度思考)。

以上是关于使用 Boost.Interprocess 使进程等待直到资源加载到共享内存中的主要内容,如果未能解决你的问题,请参考以下文章

为啥我不能使用 Boost.Interprocess 设置从一个进程通过另一个进程创建的一些内存?

boost::interprocess 32 位和 64 位进程之间的共享内存

boost::interprocess::message_queue 必须由写入它的进程创建?

boost interprocess file_lock 不适用于多个进程

Boost.interprocess Vector 作为类成员

interprocess::named_upgradable_mutex - 如果进程被杀死则保持锁定