如何按照最初在 C++ 中生成的顺序从有界缓冲区中检索项目?

Posted

技术标签:

【中文标题】如何按照最初在 C++ 中生成的顺序从有界缓冲区中检索项目?【英文标题】:How can I retrieve items from a bounded buffer in the order they were originally produced in C++? 【发布时间】:2018-11-26 12:52:25 【问题描述】:

我有一个相当典型的生产者/消费者问题,我用bounded buffer 解决了这个问题。单个进程生成项目并将它们交给 N 个工作线程。工作线程处理这些项目并将结果放置在有界缓冲区中。最终消费者进程从缓冲区中检索完成的项目。以下数据流程图说明:

每个工作人员花费不同的时间来处理其项目,因此工作人员以基本上随机的顺序将完成的项目插入有界缓冲区。这工作得很好,但有时需要以它们最初生成的相同顺序检索完成的项目。所以问题是:

如何修改现有实现以按顺序检索成品?

一个重要的附加约束是我们必须尊重有界缓冲区的大小。如果缓冲区的大小为 M,那么在任何给定时间等待消费者的完成项目不能超过 M。

有界缓冲区

有界缓冲区有一个简单的接口:

template <class T> class bounded_buffer

public:
  // initializes a new buffer
  explicit bounded_buffer(size_t capacity);
  // pushes an item into the buffer, blocks if full
  void push(T item);
  // pops an item from the buffer, blocks if empty
  T pop();
;

处理项目

工作线程使用以下代码处理项目:

std::unique_lock guard source_lock ;
auto item = GetNextItem();
guard.unlock();

buffer.push(ProcessItem(std::move(item)));

(实际代码要复杂得多,因为它必须处理输入数据的结束、取消和处理错误。但这些细节与问题无关。)

检索已完成项目的代码只是弹出有界缓冲区:

auto processed_item = buffer.pop();

【问题讨论】:

一种简单的方法是使用递增计数器为每个工作项(及其结果)分配一个唯一 ID。然后,如果消费者需要按照生成的顺序处理一组工作项结果,它可以将必要的结果一起收集到一个键/值数据结构中(例如std::map&lt;&gt;),然后当它有它需要的所有结果,迭代该集合(按升序键顺序)以处理结果。 这当然有效(尽管部分排序的向量可能是辅助队列的更好选择)。问题是这违反了我的约束,即我们不能排队超过有界缓冲区允许的项目。想象一下我们必须处理 N 个项目,但处理第一个项目比所有其他项目加起来花费的时间更长的情况。无论缓冲区大小如何,我们的辅助队列最终都会存储 N - 1 个项目。 我明白了。也许您可以通过限制第二阶段的并行量来解决这个问题,至少在您需要按顺序使用结果的处理期间?即,如果您的机器上有 8 个内核,您可能不会从拥有超过 8 个工作进程中受益,因此您一次只能发布不超过 8 个工作项(然后推迟发布更多,直到所有 8已完成),因此一次不必缓冲超过 8 个结果。 这也可以,但我怀疑它的效率会降低,因为我们必须在开始每批 8 个之前等待长轮询。无论如何,如果您对如何解决这个问题,请随时写一个答案。 【参考方案1】:

我将介绍两种解决方案。第一个是快速和简单的。第二个建立在第一个背后的想法之上,以产生更高效的东西。

第一种方法:std::future

基本思想是,当我们第一次检索一个值时,我们将在有界缓冲区中“保留”一个空间,并在我们完成处理该项目时填充它。 std::future 提供了一个现成的机制来实现这一点。我们将使用bounded_buffer&lt;std::future&lt;T&gt;&gt;,而不是使用bounded_buffer&lt;T&gt;。我们调整worker代码如下:

std::unique_lock guard source_lock ;
auto item = GetNextItem();    
std::promise<T> processed_item;
buffer.push(processed_item.get_future());
guard.unlock();

processed_item.set_value(ProcessItem(std::move(item)));

然后我们稍微调整一下消费者代码以从未来检索值:

auto processed_item = buffer.pop().get();

如果消费者进程在工作人员完成之前检索项目,则std::future&lt;T&gt;::get 将确保消费者阻塞直到项目准备好。

优点:

比较简单,可以解决问题。我们在持有源锁的同时将期货放入有界缓冲区,因此这可以保证最终结果以我们从源中检索它们的相同顺序进入缓冲区。 不需要对有界缓冲区本身进行任何更改,从而保持该抽象的纯度。

缺点:

std::future 相对重量级,需要额外的内存分配和内部同步。 我们现在在推入缓冲区时保持源锁定(推入可能会阻塞);这可能很好,但如果 GetNextItem() 很昂贵,则可能会出现问题。

第二种方法:构建更好的缓冲区

为了解决第一种方法中的性能问题,我们可以调整有界缓冲区的实现,以构建在其中保留空间的想法。我们将对它的界面进行三处改动:

    更改构造函数以接受谓词。 更改推送方法以返回定位器。 添加一个新的replace 方法,该方法接受一个定位器和一个值。

修改后的界面如下:

template <class T, class P> class bounded_buffer

public:
  using locator_type = /* unspecified */;
  // initializes a new buffer; an item is "available" if and only if it
  // satisfies this predicate
  explicit bounded_buffer(size_t capacity, P predicate);
  // pushes an item into the buffer, blocks if full; the buffer's count of
  // available items will increase by one if and only if all items in the
  // buffer (including the new one) are available
  locator_type push(T item);
  // pops an item from the buffer, blocks if empty
  T pop();
  // replaces an existing item in the buffer; if the item is the first in the
  // buffer, then we set the count of available items as follows: 0 if the
  // item is unavailable, or X if it is available where X is the number of 
  // available items at the front of the buffer
  void replace(locator_type location, T item);
;

然后我们将存储在有界缓冲区中的类型从T 更改为std::variant&lt;std::monostate, T&gt;。如果包含 T,谓词将认为项目“可用”。我们将工作代码更改如下:

std::unique_lock guard source_lock ;
auto item = GetNextItem();      
auto location = buffer.push(std::monostate);
guard.unlock();

buffer.replace(location, ProcessItem(std::move(item));

消费者中的检索代码也必须更改才能从变体中检索值:

auto processed_item = std::get<1>(buffer.pop());

优点:

std::future 方法更轻巧,因此性能更高。 (存储std::variant 索引只需要比原始版本多一点的内存。) 解决问题的方法与future 版本基本相同。

缺点:

需要对有界缓冲区实现进行更改,其基本操作不再完全符合您对该抽象的期望。 未解决上述源锁定问题。

错误处理

为简单起见,我省略了错误处理。然而,这两种方法都需要适当的异常处理。如果使用编写的代码处理项目时发生异常,消费者将挂起,因为它将等待永远不会到达的保留项目。

【讨论】:

【参考方案2】:

这是我的建议:

    WorkItem 类中添加一个“需要”字段。该字段的类型为shared_ptr&lt;WorkItem&gt;(或类似)。如果非NULL,该字段表示两个WorkItems之间的依赖关系——例如,如果WorkItemB的requires-field被设置为指向WorkItemA,这意味着消费者进程需要在B 之前使用A

    还向每个WorkItem 添加一个condition variable(及其关联的mutex

    还向每个 WorkItem 添加一个布尔“已使用”字段。该字段默认为false,但是当消费者进程消费WorkItem时,会锁定WorkItemmutex,将此字段设置为true,在@987654338上调用notify_all() @的condition variable,然后解锁mutex

    当工作进程完成处理WorkItem 时,它必须检查WorkItem 的“要求”字段。如果“requires”字段为NULL,则WorkItem可能会立即添加到有界队列中,工作进程的工作就完成了。

    1234563 mutex 并将引用 WorkItem 排入队列,它的工作就完成了。 1234563所以在这种情况下,工作进程应该在其依赖项的condition variable 上调用wait()。这将使工作进程进入休眠状态,直到其“需要”-WorkItem 被消耗——此时工作进程将唤醒(通过第 3 步中的 notify_all() 调用)并可以将自己的 @ 加入队列987654356@像往常一样。

该逻辑应足以确保在指定时正确排序,同时仍允许工作进程在没有消耗排序要求的WorkItems 上尽可能高效地工作。

【讨论】:

这是一个有趣的方法。感谢分享。

以上是关于如何按照最初在 C++ 中生成的顺序从有界缓冲区中检索项目?的主要内容,如果未能解决你的问题,请参考以下文章

写入在 Windows 下的 Java 应用程序中生成的 C++ 控制台应用程序

jpa中生成的表中的错误排序

pyAudio 中生成的声音在 Pygame 中不起作用

如何在 scala 中生成的键值对中添加值

如何在 Excel 中生成的 SQL 插入语句中获取行数?

我如何使用在 Jmeter 的下一个线程中生成的令牌