如何安全地从多个线程并行访问和写入复杂容器?

Posted

技术标签:

【中文标题】如何安全地从多个线程并行访问和写入复杂容器?【英文标题】:How to safely access and write to a complex container from multiple threads in parallel? 【发布时间】:2019-10-07 12:27:34 【问题描述】:

我有一个结构的 unordered_map 的情况。该结构包含 int(s)、bool(s) 和一个向量。我的程序将通过对服务器的 https 调用或使用 websocket 获取地图中每个项目的数据(地图中的每个项目都需要单独的 https 调用)。使用 websocket 时,地图中所有项目的数据会一起返回。提取的数据被处理并存储在各自的向量中。

websocket 在单独的线程中运行,并且应该在程序的整个生命周期中运行。

我的程序有一个删除功能,可以“清空”整个地图。还有一个 addItem() 函数,它将向我的地图添加新结构。

当结构的“updatesOn”成员为假时,没有数据被推入向量中。

我当前的实现有 3 个线程:

主线程将向地图添加新项目。主线程的另一个功能是从struct中的vector中获取数据。主线程具有清空地图并重新开始的功能。它还有一个只清空向量的函数。 第二个线程将运行 websocket 客户端并在新数据到达时填充结构中的向量。有一个 while 循环检查退出标志。一旦在主线程中设置了退出标志,该线程就会终止。 第三个线程是管理器线程。它在地图中查找新条目并进行 http 下载,然后将此项目添加到 websocket 以进行后续数据更新。它还会定期运行 http 下载,清空向量并重新填充它。

现在我有两个互斥锁。

一个用于在向/从向量写入/读取数据之前锁定。 第二个互斥锁是在映射中添加或删除新数据时。也可以在地图清空时使用。

我觉得这是互斥锁的错误用法。因为当读取或写入其结构的向量元素之一时,我可能会清空地图。这让我可以为所有人使用一个互斥锁。

问题在于这是一个实时股票数据程序,即每秒都会弹出新数据,有时甚至更快。我担心一个互斥锁可能会减慢我的整个应用程序的速度。

如上所述,所有 3 个线程都对该映射具有写访问权,主线程能够完全清空它。

牢记速度和线程安全,实现这一点的好方法是什么?

我的数据成员:

unordered_map<string, tickerDiary> tDiaries;

struct tickerDiary 
            tickerDiary() : name(""), ohlcPeriodicity("minute"), ohlcStatus(false), updatesOn(true), ohlcDayBarIndex(0), rtStatus(false) 
            string name; 
            string ohlcPeriodicity; 
            bool ohlcStatus; 
            bool rtStatus;
            bool updatesOn;
            int32 ohlcDayBarIndex;
            vector<Quotation> data;
;

struct Quotation 
            union AmiDate DateTime;
            float   Price;
            float   Open;
            float   High;
            float   Low;
            float   Volume;
            float   OpenInterest;
            float   AuxData1;
            float   AuxData2;
;

注意:我使用的是 C++11。

【问题讨论】:

我尝试遵循这个准则:不可变对象可以跨线程共享,大多数可变对象应该由一个线程拥有而不是共享。共享可变对象需要受互斥体保护。 这太宽泛了。是的,地图不是线程安全的,所以你没有太多的灵活性。任何涉及修改地图的多线程访问都需要对所有内容进行大锁定。您可以通过使用std::shared_mutex 让多个读者同时锁定地图来获得更多里程,但仅此而已。 鉴于您的描述,您有两个不同的数据结构,每个都可以从多个线程访问(读取和写入),一个互斥锁来保护每个是合适的。 设计以避免在任何线程中尝试同时锁定两个互斥锁的情况,因为这会导致死锁。这通常意味着(例如)为地图获取互斥体,在地图上执行操作,将所需数据保存在局部变量中,释放互斥体,然后为向量获取互斥体..... 感谢您的回复。如果我错了,请纠正我。所以,在这里有两个互斥锁是没有用的,对吧? map 和 vector 的单个互斥体就足够了。 【参考方案1】:

如果我对您的问题理解正确,您的地图本身主要是在主线程中编写的,其他线程仅用于对地图中条目中包含的数据进行操作。

鉴于此,对于非主线程有两个问题:

    他们处理的项目不应随意消失 他们应该是唯一一个在他们的项目上工作的人。

第一个问题可以通过将存储与地图解耦来最有效地解决。因此,对于每个项目,存储是单独分配的(通过默认分配器,或者如果您添加/删除大量项目,则通过一些池方案),并且映射仅存储共享 ptr。然后每个处理一个项目的线程只需要保持一个共享的ptr,以确保存储不会从它们下面消失。然后仅在指针的获取/存储/删除期间才需要获取映射的关联互斥锁/共享互斥锁。只要可以接受某些线程可能会浪费一些时间对已从地图中删除的项目执行操作,这将正常工作。使用 shared_ptrs 将确保您不会通过使用引用计数器来泄漏内存,并且它们还将对这些引用计数进行锁定/解锁(或者更确切地说,尝试对这些引用使用更有效的平台原语)。如果您想了解更多关于 shared_ptr 和一般智能指针的信息,this 是对 c++ 智能指针系统的合理介绍。

剩下的第二个问题可能最容易解决,方法是在数据结构(tickerDiary)本身中保留一个互斥锁,线程在开始执行需要从结构中预测行为的操作时获取,并且可以在它们完成后释放做了应该做的事。

以这种方式分离锁定应该减少对映射全局锁定的争用量。但是,您可能应该对代码进行基准测试,看看是否值得考虑到各个项目的分配和引用计数的额外成本。

【讨论】:

谢谢@davidv1992!关于第一个问题的解决方案,您能否指向一些包含示例代码的学习资源,以更好地了解 shared_ptrs 如何提供帮助?或者更好的是,您能否提供一些示例代码来概述其在这种情况下的用法。作为 C++ 的新手,这对我很有帮助。 添加了对 c++ 中智能指针介绍的引用,这应该会有所帮助。提供实际示例代码超出了我目前能够投入的时间。【参考方案2】:

我不认为在这里使用std::vector 是正确的集合。但是如果你坚持使用它,你应该为每个集合设置一个互斥锁。

我会推荐来自INTEL TBB 的concurrent_vector 或来自boost 的同步数据结构。

第三种解决方案可能是实现您自己的concurrent vector

【讨论】:

以上是关于如何安全地从多个线程并行访问和写入复杂容器?的主要内容,如果未能解决你的问题,请参考以下文章

线程同步之读写锁

对于写入固定大小数组的不同部分的并行线程,是不是存在线程安全的 Java 数据结构?

从多个线程写入静态数据实际上是不是安全

预分配 unordered_map 的线程安全

面试题:线程安全3问

Java并发.3对象的共享