一次使用多个共享内存实例

Posted

技术标签:

【中文标题】一次使用多个共享内存实例【英文标题】:using multiple instances of shared memory at once 【发布时间】:2015-01-24 12:22:15 【问题描述】:

在录制程序和显示程序(不能相同)之间传输视频流,我使用共享内存。 为了同步访问,我整理了一个类,它包装了一个 shared_memory_object、一个 mapped_region 和一个 interprocess_sharable_mutex(所有的 boost::interprocess)

我写了 2 个构造器,一个用于“主机”端,一个用于“客户端”端。 当我使用我的班级传输一个视频流时,它可以完美运行。 但是当我尝试传输两个视频流时,出现了一些问题。

首先:这是构造函数代码: (第一个是主机构造器,第二个是客户端)

    template<typename T>
    SWMRSharedMemArray<T>::SWMRSharedMemArray(std::string Name, size_t length):
        ShMutexSize(sizeof(interprocess_sharable_mutex)),
        isManager(true), _length(length), Name(Name)
    
        shared_memory_object::remove(Name.c_str());

        shm = new shared_memory_object(create_only, Name.c_str(), read_write);
        shm->truncate(ShMutexSize + sizeof(T)*length);

        region = new mapped_region(*shm, read_write);

        void *addr = region->get_address();
        mtx = new(addr) interprocess_sharable_mutex;
        DataPtr = static_cast<T*>(addr) + ShMutexSize;
    

    template<typename T>
    SWMRSharedMemArray<T>::SWMRSharedMemArray(std::string Name) :
        ShMutexSize(sizeof(interprocess_sharable_mutex)),
        isManager(false), Name(Name)
    
        shm = new shared_memory_object(open_only, Name.c_str(), read_write);
        region = new mapped_region(*shm, read_write);

        _length = (region->get_size() - ShMutexSize) / sizeof(T);
        void *addr = region->get_address();
        mtx = static_cast<decltype(mtx)>(addr);
        DataPtr = static_cast<T*>(addr) + ShMutexSize;
    

在主机端,一切看起来都还不错。 但是在为客户建设方面存在问题: 当我比较第一个和第二个实例的 shm 和 region 对象时 (具有不同的名称 ofc,但相同的长度和模板类型) 我看到很多应该不同的成员没有。 地址和成员 m_filename 与预期的不同,但成员 m_handle 是相同的。 对于区域,两个地址不同,但所有成员都相同。

我希望有人知道发生了什么。 最好的祝福 宇扎库

【问题讨论】:

【参考方案1】:

我还没有完全理解您的代码,但我对手动内存管理的过时使用感到震惊。每当我在 C++ 中看到“sizeof()”时,我都会有点担心:)

由于缺乏抽象,混乱几乎是不可避免的,而编译器也无能为力,因为您处于“别管我 - 我知道我在做什么”领域。

具体来说,这看起来不对:

DataPtr = static_cast<T *>(addr) + ShMutexSize;

sizeof(T)==sizeof(char)(IOW,T 是一个字节)时这可能是正确的,但否则你会得到 指针算法,这意味着你添加了 sizeof(T) ShMutexSize 次。这肯定是错误的,因为您只为互斥体的大小+元素数据保留了空间,直接相邻。

因此,由于索引超出了共享内存区域的大小,您会得到未使用的空间和 Undefined Behavior。


所以,让我对比两个样本;

    减少了对指针算法的依赖 通过使用托管共享内存段消除所有手动内存管理

1。手动

不需要相同数量的指针欺骗/资源管理的手动方法可能如下所示:

LiveCompiled On Coliru

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/interprocess_sharable_mutex.hpp>
#include <boost/thread/lock_guard.hpp>

namespace bip = boost::interprocess;

namespace SWMR 
    static struct server_mode_t  const/*expr*/ server_mode = server_mode_t();
    static struct client_mode_t  const/*expr*/ client_mode = client_mode_t();

    typedef bip::interprocess_sharable_mutex mutex;
    typedef boost::lock_guard<mutex> guard;

    template <typename T, size_t N> struct SharedMemArray 
        SharedMemArray(server_mode_t, std::string const& name) 
          : isManager(true), _name(name), 
            _shm(do_create(_name.c_str())),
            _region(_shm, bip::read_write)
        
            _data = new (_region.get_address()) data_t;
        

        SharedMemArray(client_mode_t, std::string const& name) 
          : isManager(false), _name(name),
            _shm(do_open(_name.c_str())),
            _region(_shm, bip::read_write),
            _data(static_cast<data_t*>(_region.get_address()))
        
            assert(sizeof(data_t) == _region.get_size());
        

    private:
        typedef bip::shared_memory_object shm_t;
        struct data_t  
            mutable mutex mtx;
            T DataPtr[N];
        ;

        bool               isManager;
        const std::string  _name;
        shm_t              _shm;
        bip::mapped_region _region;
        data_t            *_data;

        // functions to manage the shared memory
        shm_t static do_create(char const* name) 
            shm_t::remove(name);
            shm_t result(bip::create_only, name, bip::read_write);
            result.truncate(sizeof(data_t));
            return boost::move(result);
        

        shm_t static do_open(char const* name) 
            return shm_t(bip::open_only, name, bip::read_write);
        

      public:
        mutex& get_mutex() const  return _data->mtx; 

        typedef T       *iterator;
        typedef T const *const_iterator;

        iterator data()                 return _data->DataPtr; 
        const_iterator data() const     return _data->DataPtr; 

        iterator begin()                return data(); 
        const_iterator begin() const    return data(); 

        iterator end()                  return begin() + N; 
        const_iterator end() const      return begin() + N; 

        const_iterator cbegin() const   return begin(); 
        const_iterator cend() const     return end(); 
    ;


#include <vector>

static const std::string APP_UUID = "61ab4f43-2d68-46e1-9c8d-31d577ce3aa7";

struct UserData 
    int   i;
    float f;
;

#include <boost/range/algorithm.hpp>
#include <boost/foreach.hpp>
#include <iostream>

int main() 
    using namespace SWMR;
    SharedMemArray<int, 20>      s_ints   (server_mode, APP_UUID + "-ints");
    SharedMemArray<float, 72>    s_floats (server_mode, APP_UUID + "-floats");
    SharedMemArray<UserData, 10> s_udts   (server_mode, APP_UUID + "-udts");

    
        guard lk(s_ints.get_mutex());
        boost::fill(s_ints, 42);
    

    
        guard lk(s_floats.get_mutex());
        boost::fill(s_floats, 31415);
    

    
        guard lk(s_udts.get_mutex());
        UserData udt =  42, 3.14 ;
        boost::fill(s_udts, udt);
    

    SharedMemArray<int, 20>      c_ints   (client_mode, APP_UUID + "-ints");
    SharedMemArray<float, 72>    c_floats (client_mode, APP_UUID + "-floats");
    SharedMemArray<UserData, 10> c_udts   (client_mode, APP_UUID + "-udts");

    
        guard lk(c_ints.get_mutex());
        assert(boost::equal(std::vector<int>(boost::size(c_ints), 42), c_ints));
    

    
        guard lk(c_floats.get_mutex());
        assert(boost::equal(std::vector<int>(boost::size(c_floats), 31415), c_floats));
    

    
        guard lk(c_udts.get_mutex());
        BOOST_FOREACH(UserData& udt, c_udts)
            std::cout << udt.i << "\t" << udt.f << "\n";
    

注意事项

它重用代码 它不会进行不必要的动态分配(这使得类更容易“正确”地使用三规则) 它使用data_t 结构来摆脱手动偏移计算(你可以只做data-&gt;mtxdata-&gt;DataPtr

它添加了iteratorbegin()/end() 定义,以便您可以将SharedMemArray 直接用作范围,例如使用 boost::equalBOOST_FOREACH 之类的算法:

assert(boost::equal(some_vector, c_floats));

BOOST_FOREACH(UserData& udt, c_udts)
    std::cout << udt.i << "\t" << udt.f << "\n";

目前,它使用静态已知数量的元素 (N)。

如果您不希望这样,我当然会选择使用托管段(低于 2.)的方法,因为这样可以解决所有问题为您(重新)分配机制。


2。使用managed_shared_memory

当我们想要动态大小的数组时,我们在 C++ 中使用什么? 正确: std::vector.

现在可以教std::vector 从共享内存中分配,但您需要将 Boost Interprocess allocator 传递给它。这个分配器知道如何使用segment_manager 来执行来自共享内存的分配。

这是使用managed_shared_memory的相对直接的翻译

LiveCompiled On Coliru

#include <boost/container/scoped_allocator.hpp>

#include <boost/container/vector.hpp>
#include <boost/container/string.hpp>

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/interprocess/sync/interprocess_sharable_mutex.hpp>
#include <boost/thread/lock_guard.hpp>

namespace Shared 
    namespace bip = boost::interprocess;
    namespace bc  = boost::container;

    using shm_t = bip::managed_shared_memory;
    using mutex = bip::interprocess_sharable_mutex;
    using guard = boost::lock_guard<mutex>;

    template <typename T> using allocator    = bc::scoped_allocator_adaptor<
                                                   bip::allocator<T, shm_t::segment_manager>
                                               >;
    template <typename T> using vector       = bc::vector<T, allocator<T> >;
    template <typename T> using basic_string = bc::basic_string<T, std::char_traits<T>, allocator<T> >;

    using string  = basic_string<char>;
    using wstring = basic_string<wchar_t>;


namespace SWMR 
    namespace bip = boost::interprocess;

    static struct server_mode_t  const/*expr*/ server_mode = server_mode_t();
    static struct client_mode_t  const/*expr*/ client_mode = client_mode_t();

    template <typename T> struct SharedMemArray 

    private:
        struct data_t 
            using allocator_type = Shared::allocator<void>;

            data_t(size_t N, allocator_type alloc) : elements(alloc)  elements.resize(N); 
            data_t(allocator_type alloc)           : elements(alloc) 

            mutable Shared::mutex mtx;
            Shared::vector<T> elements;
        ;

        bool               isManager;
        const std::string  _name;
        Shared::shm_t      _shm;
        data_t            *_data;

        // functions to manage the shared memory
        Shared::shm_t static do_create(char const* name) 
            bip::shared_memory_object::remove(name);
            Shared::shm_t result(bip::create_only, name, 1ul << 20); // ~1 MiB
            return boost::move(result);
        

        Shared::shm_t static do_open(char const* name) 
            return Shared::shm_t(bip::open_only, name);
        

      public:
        SharedMemArray(server_mode_t, std::string const& name, size_t N = 0)
          : isManager(true), _name(name), _shm(do_create(_name.c_str()))
        
            _data = _shm.find_or_construct<data_t>(name.c_str())(N, _shm.get_segment_manager());
        

        SharedMemArray(client_mode_t, std::string const& name)
          : isManager(false), _name(name), _shm(do_open(_name.c_str()))
        
            auto found = _shm.find<data_t>(name.c_str());
            assert(found.second);
            _data = found.first;
        

        Shared::mutex&  mutex() const              return _data->mtx; 
        Shared::vector<T>      & elements()        return _data->elements; 
        Shared::vector<T> const& elements() const  return _data->elements; 
    ;


#include <vector>

static const std::string APP_UUID = "93f6b721-1d34-46d9-9877-f967fea61cf2";

struct UserData 
    using allocator_type = Shared::allocator<void>;

    UserData(allocator_type alloc) : text(alloc) 
    UserData(UserData const& other, allocator_type alloc) : i(other.i), text(other.text, alloc) 
    UserData(int i, Shared::string t) : i(i), text(t) 
    template <typename T> UserData(int i, T&& t, allocator_type alloc) : i(i), text(std::forward<T>(t), alloc) 

    // data
    int   i;
    Shared::string text;
;

#include <boost/range/algorithm.hpp>
#include <boost/foreach.hpp>
#include <iostream>

int main() 
    using namespace SWMR;
    SharedMemArray<int>      s_ints(server_mode, APP_UUID + "-ints", 20);
    SharedMemArray<UserData> s_udts(server_mode, APP_UUID + "-udts");
    // server code

    
        Shared::guard lk(s_ints.mutex());
        boost::fill(s_ints.elements(), 99);

        // or manipulate the vector. Any allocations go to the shared memory segment automatically
        s_ints.elements().push_back(42);
        s_ints.elements().assign(20, 42);
    

    
        Shared::guard lk(s_udts.mutex());
        s_udts.elements().emplace_back(1, "one");
    

    // client code
    SharedMemArray<int>      c_ints(client_mode, APP_UUID + "-ints");
    SharedMemArray<UserData> c_udts(client_mode, APP_UUID + "-udts");

    
        Shared::guard lk(c_ints.mutex());
        auto& e = c_ints.elements();
        assert(boost::equal(std::vector<int>(20, 42), e));
    

    
        Shared::guard lk(c_udts.mutex());
        BOOST_FOREACH(UserData& udt, c_udts.elements())
            std::cout << udt.i << "\t'" << udt.text << "'\n";
    

注意事项:

由于您现在存储的是一流的 C++ 对象,因此大小不是静态的。事实上,你可以push_back,如果超过了容量,容器只会使用段的分配器重新分配。

我选择使用 C++11 来实现 namespace Shared 中的便捷类型定义。然而,所有这些都可以在 c++03 中工作,尽管更冗长

我还选择了使用作用域分配器。这意味着如果T 是/also/ 使用分配器的(用户定义的)类型(例如all standard containers, std::deque, std::packaged_task, std::tuple etc.,分配器的段引用将在内部构造时隐式传递给元素。这就是为什么线条

elements.resize(N);

s_udts.elements().emplace_back(1, "one");

能够在不为元素的构造函数显式传递分配器的情况下进行编译。

示例UserData 类利用这一点来展示如何包含一个std::string(或者实际上是一个Shared::string),神奇地从与容器。

3。奖金

另请注意,这开启了将所有容器存储在单个 shared_memory_object 中的可能性,这可能是有益的,因此我提出了一个展示这种方法的变体:

LiveCompiled On Coliru

#include <boost/container/scoped_allocator.hpp>

#include <boost/container/vector.hpp>
#include <boost/container/string.hpp>

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/interprocess/sync/interprocess_sharable_mutex.hpp>
#include <boost/thread/lock_guard.hpp>

namespace Shared 
    namespace bip = boost::interprocess;
    namespace bc  = boost::container;

    using msm_t = bip::managed_shared_memory;
    using mutex = bip::interprocess_sharable_mutex;
    using guard = boost::lock_guard<mutex>;

    template <typename T> using allocator    = bc::scoped_allocator_adaptor<
                                                   bip::allocator<T, msm_t::segment_manager>
                                               >;
    template <typename T> using vector       = bc::vector<T, allocator<T> >;
    template <typename T> using basic_string = bc::basic_string<T, std::char_traits<T>, allocator<T> >;

    using string  = basic_string<char>;
    using wstring = basic_string<wchar_t>;


namespace SWMR 
    namespace bip = boost::interprocess;
    namespace bc  = boost::container;

    class Segment 
      public:
        // LockableObject, base template
        //
        // LockableObject contains a `Shared::mutex` and an object of type T
        template <typename T, typename Enable = void> struct LockableObject;

        // Partial specialization for the case when the wrapped object cannot
        // use the shared allocator: the constructor is just forwarded
        template <typename T>
        struct LockableObject<T, typename boost::disable_if<bc::uses_allocator<T, Shared::allocator<T> >, void>::type>
        
            template <typename... CtorArgs>
            LockableObject(CtorArgs&&... args) : object(std::forward<CtorArgs>(args)...) 
            LockableObject() : object() 

            mutable Shared::mutex mutex;
            T object;

          private:
            friend class Segment;
            template <typename... CtorArgs>
            static LockableObject& locate_by_name(Shared::msm_t& msm, const char* tag, CtorArgs&&... args) 
                return *msm.find_or_construct<LockableObject<T> >(tag)(std::forward<CtorArgs>(args)...);
            
        ;

        // Partial specialization for the case where the contained object can
        // use the shared allocator;
        //
        // Construction (using locate_by_name) adds the allocator as the last
        // argument.
        template <typename T>
        struct LockableObject<T, typename boost::enable_if<bc::uses_allocator<T, Shared::allocator<T> >, void>::type>
        
            using allocator_type = Shared::allocator<void>;

            template <typename... CtorArgs>
            LockableObject(CtorArgs&&... args) : object(std::forward<CtorArgs>(args)...) 
            LockableObject(allocator_type alloc = ) : object(alloc) 

            mutable Shared::mutex mutex;
            T object;

          private:
            friend class Segment;
            template <typename... CtorArgs>
            static LockableObject& locate_by_name(Shared::msm_t& msm, const char* tag, CtorArgs&&... args) 
                return *msm.find_or_construct<LockableObject>(tag)(std::forward<CtorArgs>(args)..., Shared::allocator<T>(msm.get_segment_manager()));
            
        ;

        Segment(std::string const& name, size_t capacity = 1024*1024) // default 1 MiB
            : _msm(bip::open_or_create, name.c_str(), capacity)
        
        

        template <typename T, typename... CtorArgs>
        LockableObject<T>& getLockable(char const* tag, CtorArgs&&... args) 
            return LockableObject<T>::locate_by_name(_msm, tag, std::forward<CtorArgs>(args)...);
        

    private:
        Shared::msm_t _msm;
    ;


#include <vector>

static char const* const APP_UUID = "249f3878-3ddf-4473-84b2-755998952da1";

struct UserData 
    using allocator_type = Shared::allocator<void>;
    using String         = Shared::string;

    UserData(allocator_type alloc) : text(alloc)  
    UserData(int i, String t) : i(i), text(t)  
    UserData(UserData const& other, allocator_type alloc) : i(other.i), text(other.text, alloc)  

    template <typename T>
        UserData(int i, T&& t, allocator_type alloc)
            : i(i), text(std::forward<T>(t), alloc)
         

    // data
    int i;
    String text;
;

#include <boost/range/algorithm.hpp>
#include <boost/foreach.hpp>
#include <iostream>

int main() 
    using IntVec = Shared::vector<int>;
    using UdtVec = Shared::vector<UserData>;

    boost::interprocess::shared_memory_object::remove(APP_UUID); // for demo

    // server code
    
        SWMR::Segment server(APP_UUID);

        auto& s_ints = server.getLockable<IntVec>("ints", std::initializer_list<int> 1,2,3,4,5,6,7,42); // allocator automatically added
        auto& s_udts = server.getLockable<UdtVec>("udts");

        
            Shared::guard lk(s_ints.mutex);
            boost::fill(s_ints.object, 99);

            // or manipulate the vector. Any allocations go to the shared memory segment automatically
            s_ints.object.push_back(42);
            s_ints.object.assign(20, 42);
        

        
            Shared::guard lk(s_udts.mutex);
            s_udts.object.emplace_back(1, "one"); // allocates the string in shared memory, and the UserData element too
        
    

    // client code
    
        SWMR::Segment client(APP_UUID);

        auto& c_ints = client.getLockable<IntVec>("ints", 20, 999); // the ctor arguments are ignored here
        auto& c_udts = client.getLockable<UdtVec>("udts");

        
            Shared::guard lk(c_ints.mutex);
            IntVec& ivec = c_ints.object;
            assert(boost::equal(std::vector<int>(20, 42), ivec));
        

        
            Shared::guard lk(c_udts.mutex);
            BOOST_FOREACH(UserData& udt, c_udts.object)
                std::cout << udt.i << "\t'" << udt.text << "'\n";
        
    

注意事项:

您现在可以存储任何内容,而不仅仅是“动态数组”(vector&lt;T&gt;)。你可以这样做:

auto& c_udts = client.getLockable<double>("a_single_double");

当你存储一个与共享分配器兼容的容器时,LockableObject 的构造方法会透明地将分配器实例添加为包含的T object; 的最后一个构造函数参数。

李>

我将remove() 调用从Segment 类中移出,因此无需区分客户端/服务器模式。我们只使用open_or_createfind_or_construct

【讨论】:

@BenVoigt 你指的是什么?当前显示的代码还没有使用它(显而易见) @sehe:最后一个标题可能是有用的建议,如果很清楚可以使用什么 managed_shared_memory 类来解决问题。当然,如果有完整的例子,它会更加有用。 @BenVoigt 我在写这个 添加了 managed_shared_memory 变体 - 它更加灵活,实际上是它的一个额外变体(它将所有共享分配组合在一个 shared_memory_object 下以保持清洁。我希望这可以作为灵感关于使用 Boost Interprocess 更高级的接口。/cc @BenVoigt @sehe :我知道 2 年后。在您的选项 1 实现代码中 - 如果客户端和服务器可以随时启动,我们如何同步。在服务器放置new之前,客户端可以访问该位置吗? [我有类似的情况,正在考虑如何解决这种情况?]

以上是关于一次使用多个共享内存实例的主要内容,如果未能解决你的问题,请参考以下文章

sqlite vs 共享内存应用程序 vs ipc vs?

linux共享内存和mmap的区别

从一个进程创建多个共享内存

oracle内存结构由啥组成?

GPU共享内存实例

如何设置linux的共享内存