从一个进程向另一个 C++ 发出信号
Posted
技术标签:
【中文标题】从一个进程向另一个 C++ 发出信号【英文标题】:Signal from one process to another C++ 【发布时间】:2021-09-11 22:17:09 【问题描述】:我知道标题有点宽泛,所以让我详细说明一下。 我有 2 个进程正在运行,一个正在写入共享内存,另一个正在从中读取。 为了实现共享内存效果,我使用了 boost::interprocess(顺便说一句,如果有更方便的库,请告诉我)。
所以我实现了以下内容:
//作家
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
namespace ip = boost::interprocess;
class SharedMemory
public:
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
std::shared_ptr<ip::windows_shared_memory> getSM()
return sm_;
private:
std::function<void()> destroyer_;
std::string name_;
std::shared_ptr<ip::windows_shared_memory> sm_;
;
int main()
SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
ip::mapped_region region(*creator.getSM(), ip::read_write);
std::memset(region.get_address(), 1, region.get_size());
int status = system("reader.exe");
std::cout << status << std::endl;
所以我正在创建共享内存,将 1 写入它然后调用阅读器 exe。 (我跳过阅读器部分,因为它几乎相同,但不是写它读)
这段代码工作正常,我写入内存,其他进程读取它并打印我的 1。 但是,如果我同时运行这 2 个 exe 并且我想写入内存然后通知另一个进程有更新怎么办?如何从一个 exe/进程向另一个发出信号?
场景是我正在流式传输一些实时数据,写入内存,然后告诉另一个进程有更新。
【问题讨论】:
这个任务的预期机制是一个条件变量。 Boost 似乎提供了interprocess_condition
,它可以放在共享内存中,似乎正是你想要的。
【参考方案1】:
我认为确实有更方便的方法。
原则上,要在进程之间进行同步,您可以使用与在进程内部(线程之间)同步的所有相同方法:使用同步原语(互斥锁/临界区、条件变量、信号量、屏障等)。
此外,您需要有一个同步的数据结构。这恰恰是目前的致命弱点。这里完全没有数据结构。
虽然您可以使用自己的逻辑进行原始字节访问,但我看不出使用高级库这样做的吸引力。相反,我会使用托管内存段,它可以让您按名称查找或构造类型化对象。这可能包括您的同步原语。
事实上,您可以使用已内置所有同步功能的message_queue
来加快此过程。
手动同步:使用 Segment Manager 的写入器
我将提供可移植代码,因为我没有 Windows 机器。首先让我们考虑一个数据结构。一个简单的例子是消息队列。让我们使用deque<string>
。
不完全是微不足道的数据结构,但好消息是 Boost Interprocess 提供了使事情正常工作的所有具体细节(使用进程间分配器)。
namespace Shared
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = bc::deque<T, Alloc<T>>;
using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
class Memory
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
DataStructure& get() return data_;
DataStructure const& get() const return data_;
private:
std::string name_;
Segment sm_;
DataStructure& data_;
;
// namespace Shared
现在我们可以让作者变成这样:
int main()
Shared::Memory creator("SharedMemory", 10*1024*1024);
creator.get().emplace_back("Hello");
creator.get().emplace_back("World");
std::cout << "Total queued: " << creator.get().size() << "\n";
将打印例如
Total queued: 2
Total queued: 4
Total queued: 6
取决于你运行它的次数。
读者方面
现在让我们做阅读器方面。其实大同小异,我们把它放在同一个主程序中:
int main(int argc, char**)
Shared::Memory mem("SharedMemory", 10*1024*1024);
auto& data = mem.get();
bool is_reader = argc > 1;
if (not is_reader)
data.emplace_back("Hello");
data.emplace_back("World");
std::cout << "Total queued: " << data.size() << "\n";
else
std::cout << "Found entries: " << data.size() << "\n";
while (!data.empty())
std::cout << "Dequeued " << data.front() << "\n";
data.pop_front();
简单的开始。现在运行例如test.exe READER
会反过来打印如下内容:
锁定与同步
目标是同时运行写入器和读取器。由于缺乏锁定和同步,这并不像现在这样安全。让我们添加它:
class Memory
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
// ...
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
;
现在让我们小心点。因为我们希望data_
队列上的所有操作都是同步的,所以我们不会像以前那样公开它(使用get()
成员函数)。相反,我们公开了我们支持的操作的确切接口:
size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue(); // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue
这些都按要求进行锁定,正如您所期望的那样:
size_t queue_length() const
ip::scoped_lock<Mutex> lk(mx_);
return data_.size();
关于潜在的阻塞操作变得更加有趣。我选择了最大容量,所以enqueue
需要等待容量:
// blocking when queue at max_capacity
void enqueue(std::string message)
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] return data_.size() < max_capacity; );
data_.emplace_back(std::move(message));
cv_.notify_one();
相反,dequeue
需要等待消息可用:
// blocking dequeue
std::string dequeue()
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] return not data_.empty(); );
return do_pop();
或者,你可以使它成为非阻塞的,只是可选地返回一个值:
// non-blocking dequeue
std::optional<std::string> try_dequeue()
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
现在我们主要有三个版本:编写器、读取器和连续读取器(后者演示了阻塞接口):
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <iostream>
#include <iomanip>
#include <optional>
namespace ip = boost::interprocess;
namespace bc = boost::container;
namespace Shared
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = ip::deque<T, Alloc<T>>;
using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
using Mutex = ip::interprocess_mutex;
using Cond = ip::interprocess_condition;
class Memory
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
size_t queue_length() const
ip::scoped_lock<Mutex> lk(mx_);
return data_.size(); // caution: racy by design!
// blocking when queue at max_capacity
void enqueue(std::string message)
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] return data_.size() < max_capacity; );
data_.emplace_back(std::move(message));
cv_.notify_one();
// blocking dequeue
std::string dequeue()
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] return not data_.empty(); );
return do_pop();
// non-blocking dequeue
std::optional<std::string> try_dequeue()
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
// Assumes mx_ locked by current thread!
std::string do_pop()
auto&& tmp = std::move(data_.front());
data_.pop_front();
cv_.notify_all(); // any of the waiters might be a/the writer
return std::string(tmp.begin(), tmp.end());
;
// namespace Shared
int main(int argc, char**)
Shared::Memory mem("SharedMemory", 10*1024*1024);
switch (argc)
case 1:
mem.enqueue("Hello");
mem.enqueue("World");
std::cout << "Total queued: " << mem.queue_length() << "\n";
break;
case 2:
std::cout << "Found entries: " << mem.queue_length() << "\n";
while (auto msg = mem.try_dequeue())
std::cout << "Dequeued " << *msg << "\n";
break;
case 3:
std::cout << "Continuous reader\n";
while (true)
std::cout << "Dequeued " << mem.dequeue() << "\n";
break;
小演示:
总结,注意
请注意,上述内容有些松散。值得注意的是,Boost Interprocess 中没有健壮的锁需要额外注意在不持有锁的情况下正确关闭。
我建议也与ip::message_queue
进行对比:
【讨论】:
感谢您的详细回答!虽然我在 Visual Studio 上遇到了一些编译器错误,但你熟悉吗? : 错误 C2664: 'boost::container::basic_string以上是关于从一个进程向另一个 C++ 发出信号的主要内容,如果未能解决你的问题,请参考以下文章
“C++ 应用程序可以在完成时发出 python 脚本信号”吗?