跨进程内存屏障

Posted

技术标签:

【中文标题】跨进程内存屏障【英文标题】:Cross Process Memory Barrier 【发布时间】:2014-06-14 21:12:10 【问题描述】:

我正在使用内存映射文件进行跨进程数据共享。

我有两个进程,一个写入数据块,一个或多个读取这些块。为了让读者知道一个块是否准备好,我写了两个“标签”值,一个在开始,一个在每个块的末尾,表示它已经准备好。

看起来像这样:

注意:在这个例子中,我没有包括阅读器进程可以寻找之前的块的事实。

static const int32_t START_TAG = 0xFAFAFAFA;
static const int32_t END_TAG = 0x06060606;

void writer_process(int32_t* memory_mapped_file_ptr)

    auto ptr = memory_mapped_file_ptr;
    while (true)
    
        std::vector<int32_t> chunk = generate_chunk();
        std::copy(ptr + 2, chunk.begin(), chunk.end());

        // We are done writing. Write the tags.

        *ptr = START_TAG;
        ptr += 1;
        *ptr = chunk.size();
        ptr += 1 + chunk.size();
        *ptr = END_TAG;
        ptr += 1;
       


void reader_process(int32_t* memory_mapped_file_ptr)

    auto ptr = memory_mapped_file_ptr;
    while (true)
    
        auto ptr2 = ptr;

        std::this_thread::sleep_for(std::chrono::milliseconds(20));

        if (*ptr2 != START_TAG)
            continue;

        ptr2 += 1;

        auto len = *ptr2;
        ptr2 += 1;

        if (*(ptr2 + len) != END_TAG)
            continue;

        std::vector<int32_t> chunk(ptr2, ptr2 + len);

        process_chunk(chunk);
    

到目前为止,这种工作。但在我看来,这是一个非常糟糕的主意,并且由于缓存行为可能会导致各种奇怪的错误。

有没有更好的方法来实现这一点?

我看过:

消息队列:效率低下,仅适用于单个阅读器。我也无法找到以前的块。

互斥体:不确定如何只锁定当前块而不是整个内存。我不能为每个可能的块设置互斥锁(尤其是当它们具有动态大小时)。我考虑过将内存划分为每个具有一个互斥锁的块,但这对我来说不起作用,因为它在写入和读取之间产生延迟。

【问题讨论】:

没有。很难猜到 20 毫秒的睡眠可能比互斥锁更有效。这当然是不可能的。您应该使用管道的可能性很高。 @HansPassant:效率更高,因为读者大多会稍微落后于作者。所以轮询非常低,它们可以并行工作。如果我使用互斥锁(与示例不同),那么读取器和写入器根本无法并行工作...... 不确定如何应用管道/消息队列? 所写的代码不是线程安全的。结束标记的写入可能在中间数据之前变得可见。您需要使用内存屏障,特别是最终写入时的释放。 (Visual C++ 在写入 volatile 时会自动释放。) 内存屏障作用于内存,而不是进程,所以继续在共享内存上使用它。 【参考方案1】:

正如其他人所提到的,您需要有某种内存屏障,以确保在多个处理器(和进程)之间正确同步。

我建议您使用定义一组当前可用条目的标题更改您的方案,并在新条目可用时使用互锁增量。

http://msdn.microsoft.com/en-us/library/windows/desktop/ms683614%28v=vs.85%29.aspx

我建议的结构是这样的,这样你就可以真正实现你想要的,并且快速完成:

// at the very start, the number of buffers you might have total
uint32_t   m_size;    // if you know the max. number maybe use a const instead...

// then m_size structures, one per buffer:
uint32_t   m_offset0;  // offset to your data
uint32_t   m_size0;    // size of that buffer
uint32_t   m_busy0;    // whether someone is working on the buffer
uint32_t   m_offset1;
uint32_t   m_size1;
uint32_t   m_busy1;
...
uint32_t   m_offsetN;
uint32_t   m_sizeN;
uint32_t   m_busyN;

通过偏移量和大小,您可以直接访问映射区域中的任何缓冲区。要分配缓冲区,您可能希望实现类似于 malloc() 所做的事情,尽管所有必要的信息都可以在此表中找到,因此不需要链表等。但是,如果您要释放一些缓冲区,您需要跟踪其大小。而且,如果您一直在分配/释放,您将享受碎片化的乐趣。总之……

另一种方法是使用环形缓冲区(本质上是一个“管道”),所以你总是在最后一个缓冲区之后分配,如果那里没有足够的空间,从一开始就分配,根据需要关闭 N 个缓冲区新的缓冲区大小要求...这可能更容易实现。但是,这意味着您可能需要知道在查找缓冲区时从哪里开始(即,为当前被认为是“第一个”[最旧]缓冲区的内容建立一个索引,该缓冲区恰好是下一个要重用的缓冲区。)

但是由于您没有解释缓冲区如何变得“旧”和可重用(释放以便可以重用),所以我不能真正给您一个确切的实现。但是像下面这样的东西可能会为你做。

在标头结构中,如果 m_offset 为零,则当前未分配缓冲区,因此与该条目无关。如果 m_busy 为零,则没有进程正在访问该缓冲区。我还提供了一个 m_free 字段,它可以是 0 或 1。只要编写者需要更多缓冲区来保存刚收到的数据,就会将该参数设置为 1。我不会对那个太深入,因为我不完全知道你是如何释放缓冲区的。如果您也从不释放缓冲区,则不需要。

0) 结构

// only if the size varies between runs, otherwise use a constant like:
// namespace  uint32_t const COUNT = 123; 
struct header_count_t

    uint32_t    m_size;
;

struct header_t

    uint32_t    m_offset;
    uint32_t    m_size;
    uint32_t    m_busy;  // to use with Interlocked...() you may want to use LONG instead
;

// and from your "ptr" you'd do:
header_count_t *header_count = (header_count_t *) ptr;
header_count->m_size = ...; // your dynamic size (if dynamic it needs to be)
header_t *header = (header_t *) (header_count + 1);
// first buffer will be at: data = (char *) (header + header_count->m_size)
for(size_t n(0); n < header_count->m_size; ++n)

   // do work (see below) on header[n]
   ...

1) 写入者访问数据必须先锁定缓冲区,如果不可用,再用下一个尝试;使用InterlockedIncrement() 完成锁定并使用InterlockedDecrement() 解锁:

InterlockedIncrement(&header[n]->m_busy);
if(header[n]->m_offset == nullptr)

     // buffer not allocated yet, allocate now and copy data,
     // but do not save the offset until "much" later
     uint32_t offset = malloc_buffer();
     memcpy(ptr + offset, source_data, size);
     header[n]->m_size = size;

     // extra memory barrier to make sure that the data copied
     // in the buffer is all there before we save the offset
     InterlockedIncrement(&header[n]->m_busy);
     header[n]->m_offset = offset;
     InterlockedDecrement(&header[n]->m_busy);

InterlockedDecrement(&header[n]->m_busy);

现在,如果您希望能够释放缓冲区,这还不够。在这种情况下,需要另一个标志来防止其他进程重用旧缓冲区。同样,这将取决于您的实现...(请参见下面的示例。)

2) 访问数据的读取器必须首先使用InterlockedIncrement() 锁定缓冲区,一旦完成缓冲区,它需要使用InterlockedDecrement() 释放缓冲区。请注意,即使 m_offset 为 nullptr,也会应用锁定。

InterlockedIncrement(&header[n]->m_busy);
if(header[n]->m_offset)

    // do something with the buffer
    uint32_t size(header[n]->m_size);
    char const *buffer_ptr = ptr + header[n]->m_offset;
    ...

InterlockedDecrement(header[n]->m_busy);

所以这里我只是测试一下是否设置了m_offset。

3) 如果您希望能够释放缓冲区,您还需要测试另一个标志(见下文),如果另一个标志为真(或假),那么缓冲区即将被释放(只要所有进程都释放它),然后可以在前面的代码 sn-p 中使用该标志(即 m_offset 为零,或者该标志为 1,m_busy 计数器正好为 1。)

对于作者来说是这样的:

LONG lock = InterlockedIncrement(&header[n]->m_busy);
if(header[n]->m_offset == nullptr
|| (lock == 1 && header[n]->m_free == 1))

    // new buffer (nullptr) or reusing an old buffer

    // reset the offset first
    InterlockedIncrement(&header[n]->m_busy);
    header[n]->m_offset = nullptr;
    InterlockedDecrement(&header[n]->m_busy);
    // then clear m_free
    header[n]->m_free = 0;
    InterlockedIncrement(&header[n]->m_busy);  // WARNING: you need another Decrement against this one...

    // code as before (malloc_buffer, memcpy, save size & offset...)
    ...

InterlockedDecrement(&header[n]->m_busy);

在阅读器中,测试的变化如下:

if(header[n]->m_offset && header[n]->m_free == 0)

附带说明:所有 Interlocked...() 函数都是完整的内存屏障(栅栏),所以在这方面你都很好。您必须使用其中的许多来确保获得正确的同步。

请注意,这是未经测试的代码...但如果您想避免进程间信号量(这可能不会简化这么多),那就是要走的路。请注意,20 毫秒的 sleep() 本身不是必需的,除非显然要避免每个阅读器使用一个固定 CPU。

【讨论】:

以上是关于跨进程内存屏障的主要内容,如果未能解决你的问题,请参考以下文章

Linux 内核 内存管理优化内存屏障 ④ ( 处理器内存屏障 | 八种处理器内存屏障 | 通用内存屏障 | 写内存屏障 | 读内存屏障 | 数据依赖屏障 | 强制性内存屏障 |SMP内存屏障 )

Linux 内核 内存管理优化内存屏障 ② ( 内存屏障 | 编译器屏障 | 处理器内存屏障 | 内存映射 I/O 写屏障 )

内存屏障

解密内存屏障

解密内存屏障

什么是内存屏障