从一个进程向另一个 C++ 发出信号

Posted

技术标签:

【中文标题】从一个进程向另一个 C++ 发出信号【英文标题】:Signal from one process to another C++ 【发布时间】:2021-09-11 22:17:09 【问题描述】:

我知道标题有点宽泛,所以让我详细说明一下。 我有 2 个进程正在运行,一个正在写入共享内存,另一个正在从中读取。 为了实现共享内存效果,我使用了 boost::interprocess(顺便说一句,如果有更方便的库,请告诉我)。

所以我实现了以下内容:

//作家

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>

namespace ip = boost::interprocess;
class SharedMemory

public:
    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
    
    

    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
    
    

    std::shared_ptr<ip::windows_shared_memory> getSM()
    
        return sm_;
    
private:
    std::function<void()> destroyer_;
    std::string name_;
    std::shared_ptr<ip::windows_shared_memory> sm_;
;


int main()

    SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
    ip::mapped_region region(*creator.getSM(), ip::read_write);
    std::memset(region.get_address(), 1, region.get_size());

    int status = system("reader.exe");
    std::cout << status << std::endl;

所以我正在创建共享内存,将 1 写入它然后调用阅读器 exe。 (我跳过阅读器部分,因为它几乎相同,但不是写它读)

这段代码工作正常,我写入内存,其他进程读取它并打印我的 1。 但是,如果我同时运行这 2 个 exe 并且我想写入内存然后通知另一个进程有更新怎么办?如何从一个 exe/进程向另一个发出信号?

场景是我正在流式传输一些实时数据,写入内存,然后告诉另一个进程有更新。

【问题讨论】:

这个任务的预期机制是一个条件变量。 Boost 似乎提供了interprocess_condition,它可以放在共享内存中,似乎正是你想要的。 【参考方案1】:

我认为确实有更方便的方法。

原则上,要在进程之间进行同步,您可以使用与在进程内部(线程之间)同步的所有相同方法:使用同步原语(互斥锁/临界区、条件变量、信号量、屏障等)。

此外,您需要有一个同步的数据结构。这恰恰是目前的致命弱点。这里完全没有数据结构。

虽然您可以使用自己的逻辑进行原始字节访问,但我看不出使用高级库这样做的吸引力。相反,我会使用托管内存段,它可以让您按名称查找或构造类型化对象。这可能包括您的同步原语。

事实上,您可以使用已内置所有同步功能的message_queue 来加快此过程。

手动同步:使用 Segment Manager 的写入器

我将提供可移植代码,因为我没有 Windows 机器。首先让我们考虑一个数据结构。一个简单的例子是消息队列。让我们使用deque&lt;string&gt;

不完全是微不足道的数据结构,但好消息是 Boost Interprocess 提供了使事情正常工作的所有具体细节(使用进程间分配器)。

namespace Shared 

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = bc::deque<T, Alloc<T>>;
    using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;

    class Memory 
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        
        

        DataStructure&       get()        return data_;  
        DataStructure const& get() const  return data_;  

      private:
        std::string    name_;
        Segment        sm_;
        DataStructure& data_;
    ;

 // namespace Shared

现在我们可以让作者变成这样:

int main()

    Shared::Memory creator("SharedMemory", 10*1024*1024);

    creator.get().emplace_back("Hello");
    creator.get().emplace_back("World");

    std::cout << "Total queued: " << creator.get().size() << "\n";

将打印例如

Total queued: 2
Total queued: 4
Total queued: 6

取决于你运行它的次数。

读者方面

现在让我们做阅读器方面。其实大同小异,我们把它放在同一个主程序中:

int main(int argc, char**)

    Shared::Memory mem("SharedMemory", 10*1024*1024);
    auto& data = mem.get();

    bool is_reader = argc > 1;

    if (not is_reader) 
        data.emplace_back("Hello");
        data.emplace_back("World");
        std::cout << "Total queued: " << data.size() << "\n";
     else 
        std::cout << "Found entries: " << data.size() << "\n";
        while (!data.empty()) 
            std::cout << "Dequeued " << data.front() << "\n";
            data.pop_front();
        
    


简单的开始。现在运行例如test.exe READER 会反过来打印如下内容:

锁定与同步

目标是同时运行写入器和读取器。由于缺乏锁定和同步,这并不像现在这样安全。让我们添加它:

class Memory 
    static constexpr size_t max_capacity = 100;
  public:
    Memory(const char* name, size_t size)
        : name_(name)
        , sm_(ip::open_or_create, name, size)
        , mx_(*sm_.find_or_construct<Mutex>("mutex")())
        , cv_(*sm_.find_or_construct<Cond>("condition")())
        , data_(*sm_.find_or_construct<DataStructure>("data")(
              sm_.get_segment_manager()))
     

    // ... 

  private:
    std::string    name_;
    Segment        sm_;
    Mutex&         mx_;
    Cond&          cv_;
    DataStructure& data_;
;

现在让我们小心点。因为我们希望data_ 队列上的所有操作都是同步的,所以我们不会像以前那样公开它(使用get() 成员函数)。相反,我们公开了我们支持的操作的确切接口:

size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue();             // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue

这些都按要求进行锁定,正如您所期望的那样:

size_t queue_length() const 
    ip::scoped_lock<Mutex> lk(mx_);
    return data_.size();

关于潜在的阻塞操作变得更加有趣。我选择了最大容量,所以enqueue需要等待容量:

// blocking when queue at max_capacity
void enqueue(std::string message) 
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this]  return data_.size() < max_capacity; );

    data_.emplace_back(std::move(message));
    cv_.notify_one();

相反,dequeue 需要等待消息可用:

// blocking dequeue
std::string dequeue() 
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this]  return not data_.empty(); );

    return do_pop();

或者,你可以使它成为非阻塞的,只是可选地返回一个值:

// non-blocking dequeue
std::optional<std::string> try_dequeue() 
    ip::scoped_lock<Mutex> lk(mx_);

    if (data_.empty())
        return std::nullopt;
    return do_pop();

现在我们主要有三个版本:编写器、读取器和连续读取器(后者演示了阻塞接口):

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>

#include <iostream>
#include <iomanip>
#include <optional>

namespace ip = boost::interprocess;
namespace bc = boost::container;

namespace Shared 

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = ip::deque<T, Alloc<T>>;
    using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;
    using Mutex         = ip::interprocess_mutex;
    using Cond          = ip::interprocess_condition;

    class Memory 
        static constexpr size_t max_capacity = 100;
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , mx_(*sm_.find_or_construct<Mutex>("mutex")())
            , cv_(*sm_.find_or_construct<Cond>("condition")())
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
         

        size_t queue_length() const 
            ip::scoped_lock<Mutex> lk(mx_);
            return data_.size(); // caution: racy by design!
        

        // blocking when queue at max_capacity
        void enqueue(std::string message) 
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this]  return data_.size() < max_capacity; );

            data_.emplace_back(std::move(message));

            cv_.notify_one();
        

        // blocking dequeue
        std::string dequeue() 
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this]  return not data_.empty(); );

            return do_pop();
        

        // non-blocking dequeue
        std::optional<std::string> try_dequeue() 
            ip::scoped_lock<Mutex> lk(mx_);

            if (data_.empty())
                return std::nullopt;
            return do_pop();
        

      private:
        std::string    name_;
        Segment        sm_;
        Mutex&         mx_;
        Cond&          cv_;
        DataStructure& data_;

        // Assumes mx_ locked by current thread!
        std::string do_pop() 
            auto&& tmp = std::move(data_.front());
            data_.pop_front();
            cv_.notify_all(); // any of the waiters might be a/the writer
            return std::string(tmp.begin(), tmp.end());
        
    ;

 // namespace Shared

int main(int argc, char**)

    Shared::Memory mem("SharedMemory", 10*1024*1024);

    switch (argc) 
    case 1:
        mem.enqueue("Hello");
        mem.enqueue("World");
        std::cout << "Total queued: " << mem.queue_length() << "\n";
        break;
    case 2:
        std::cout << "Found entries: " << mem.queue_length() << "\n";
        while (auto msg = mem.try_dequeue()) 
            std::cout << "Dequeued " << *msg << "\n";
        
        break;
    case 3: 
        std::cout << "Continuous reader\n";
        while (true) 
            std::cout << "Dequeued " << mem.dequeue() << "\n";
        
        break;
    

小演示:

总结,注意

请注意,上述内容有些松散。值得注意的是,Boost Interprocess 中没有健壮的锁需要额外注意在不持有锁的情况下正确关闭。

我建议也与ip::message_queue 进行对比:

How to put file in boost::interprocess::managed_shared_memory?(对比共享内存、message_queue 和纯 TCP 套接字)

【讨论】:

感谢您的详细回答!虽然我在 Visual Studio 上遇到了一些编译器错误,但你熟悉吗? : 错误 C2664: 'boost::container::basic_string,boost::container::scoped_allocator_adaptor<:interprocess::allocator>>>:: basic_string(boost::container::basic_string,boost::container::scoped_allocator_adaptor<:interprocess::allocator>>>::reserve_t,unsigned __int64,const boost::container::scoped_allocator_adaptor<:interprocess::allocator>> &)': @其实找到原因了,emplace后面是在抱怨std::string,但是用我们的String就好了。 嗯。听起来有点像 MSVC 的一致性问题。但我无法检查是否缺少它(我不会错过它:))

以上是关于从一个进程向另一个 C++ 发出信号的主要内容,如果未能解决你的问题,请参考以下文章

如何向另一个线程发出带有数组参数的信号

从另一个 Java 进程发出信号

“C++ 应用程序可以在完成时发出 python 脚本信号”吗?

向同一进程中的另一个线程发出信号

如何从信号中找出进程的 pid 在 linux 的 c++ 中启动?

信号概述