尝试构建“无锁”数据结构 C++

Posted

技术标签:

【中文标题】尝试构建“无锁”数据结构 C++【英文标题】:Trying to build "lock-free" data structure C++ 【发布时间】:2020-07-01 08:41:22 【问题描述】:

我正在尝试实现一个类,它提供信息,附加到这个线程给用户。线程可以在类中注册和注销。由于类的注册、注销和解析缓冲区是非常罕见的事件,因此我锁定了互斥锁。通常情况下,当客户端调用 getBuffer() 并且没有传入的注册/注销请求时,我只检查原子布尔变量 _bufReconciled。但在实践中,我捕捉到线程注册的情况,并将他的数据添加到映射中,但是另一个线程找不到这个元素(调试器显示映射中添加的元素)。我想,由于内存一致性,并非 std::map _userTheradToData 的所有数据都在线程之间同步,但我不知道如何解决这个问题。下面有一段代码说明了问题:

#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <map>
#include <vector>
#include <cassert>
#include <exception>

struct UserData

    std::string _data1;
    std::string _data2;
;

class ThreadBufferManager


    std::map <std::thread::id, std::unique_ptr<UserData>> _userThreadToData;
    std::vector <std::thread::id> _pendingToUnregisterThread;
    std::vector <std::thread::id> _pendingToRegisterThread;

    std::atomic <bool> _bufReconciled;
    std::mutex _processingThreads;

    public:

    ThreadBufferManager ()
    
        _bufReconciled = true;
    

    void registerThread ()
    
        std::lock_guard <std::mutex> lock (_processingThreads);
        _bufReconciled = false;
        _pendingToRegisterThread.push_back(std::this_thread::get_id());
    

    void unregisterThread ()
    
        std::lock_guard <std::mutex> lock (_processingThreads);
        _bufReconciled = false;
        _pendingToUnregisterThread.push_back(std::this_thread::get_id());
    

    UserData& getBuffer ()
    

        if (!_bufReconciled.load())
        
                std::lock_guard<std::mutex> lock (_processingThreads);
                for (const auto& threadToUnregister : _pendingToUnregisterThread)
                
                     std::cout << "REM" << threadToUnregister << std::endl;
                    _userThreadToData.erase(threadToUnregister);
                

                for (const auto& threadToRegister : _pendingToRegisterThread)
                
                    std::cout << "ADD" << threadToRegister << std::endl;
                    _userThreadToData.emplace(threadToRegister, std::unique_ptr<UserData> (new UserData));
                

                _pendingToUnregisterThread.clear();
                _pendingToRegisterThread.clear();
                _bufReconciled = true;

       


      auto it =  _userThreadToData.find (std::this_thread::get_id());
      if (it == _userThreadToData.end())
      
            std::cout << "ERR" << std::this_thread::get_id() << std::endl;
            std::terminate();
      
      return *it->second;
    

;

void threadFoo (ThreadBufferManager& tbm)

    tbm.registerThread();
    for (std::size_t i = 0; i < 100; ++i)
    
        tbm.getBuffer();
    
    tbm.unregisterThread();



int main()

    ThreadBufferManager tbm;

    while (true)
    
         std::thread thread1 (threadFoo, std::ref(tbm));
         std::thread thread2 (threadFoo, std::ref(tbm));
         std::thread thread3 (threadFoo, std::ref(tbm));
         std::thread thread4 (threadFoo, std::ref(tbm));
         std::thread thread5 (threadFoo, std::ref(tbm));
         std::thread thread6 (threadFoo, std::ref(tbm));
         std::thread thread7 (threadFoo, std::ref(tbm));
         std::thread thread8 (threadFoo, std::ref(tbm));
         std::thread thread9 (threadFoo, std::ref(tbm));
         std::thread thread10 (threadFoo, std::ref(tbm));
         std::thread thread11 (threadFoo, std::ref(tbm));
         std::thread thread12 (threadFoo, std::ref(tbm));
         std::thread thread13 (threadFoo, std::ref(tbm));

         thread1.join();
         thread2.join();
         thread3.join();
         thread4.join();
         thread5.join();
         thread6.join();
         thread7.join();
         thread8.join();
         thread9.join();
         thread10.join();
         thread11.join();
         thread12.join();
         thread13.join();
    
    return 0;

【问题讨论】:

我不是任何线程方面的专家,但是以原子方式加载 _bufReconciled 而不是以原子方式存储它似乎很奇怪,不是吗?在我看来,如果它已经是 true,那么您的线程可以非原子地从 _userThreadToData 读取,因此是比赛。 我认为至少你需要在修改地图之后设置你的原子标志,这样其他线程在地图被更改时就无法读取地图.. @underscore_d。实际上,std::atomic opeartor=(value) 等于 std::atomic store (value)。是的,如果您使用相同的语法,它看起来会更好,但我认为在这种情况下它并不重要。 [链接]en.cppreference.com/w/cpp/atomic/atomic/operator%3D 【参考方案1】:

这行不通。

没有什么可以阻止一个线程访问_userThreadToData [在锁之外],而另一个线程正在改变同一个成员。

考虑线程 '1' 一路走来,即将运行这一行:

auto it =  _userThreadToData.find (std::this_thread::get_id());

然后线程“2”出现并运行所有内容,包括:

_userThreadToData.emplace(threadToRegister, std::unique_ptr<UserData> (new UserData));

您说除非“没有传入的注册请求”,否则您不会访问数据,但事实并非如此 - 您检查是否有传入的请求,然后稍后您访问地图。要编写真正的无锁代码(这很困难),检查和访问必须是原子操作。

只需使用锁。

【讨论】:

以上是关于尝试构建“无锁”数据结构 C++的主要内容,如果未能解决你的问题,请参考以下文章

c++千万数据级别正确使用无锁队列,避免内存撕碎

c++千万数据级别正确使用无锁队列,避免内存撕碎

C++无锁线程问题 - 多个线程在一个连续数组上迭代,但从未访问过相同的成员数据?

无锁队列真的比有锁队列快吗c++ linux后台开发

Go语言无锁队列组件的实现 (chan/interface/select)

Manning新书C++并行实战,592页pdf,C++ Concurrency in Action