std::unordered_map 上的线程安全包装器

Posted

技术标签:

【中文标题】std::unordered_map 上的线程安全包装器【英文标题】:threadsafe wrapper on std::unordered_map 【发布时间】:2015-01-06 10:16:57 【问题描述】:

我正在尝试在 std::unordered_map 之上实现一个线程安全的包装类 下面的开始和结束功能是否安全?

        std::unordered_map<Key, T, Hash, Pred, Alloc> umap;
        iterator begin() 
            return umap.begin();
           
        iterator end() 
            return umap.end();
        

如果在复制/移动操作符=实现中有任何明显的错误,也请评论

    concurrent_unordered_map& operator=(const concurrent_unordered_map& other) ;
    
        if (this!=&other) 
          std::lock(entry_mutex, other.entry_mutex);
          std::lock_guard<boost::shared_mutex> _mylock(entry_mutex, std::adopt_lock);
          std::shared_lock<boost::shared_mutex> _otherlock(other.entry_mutex, std::adopt_lock);
          umap = other.umap;
        
        return *this;           
    
    concurrent_unordered_map& operator=(concurrent_unordered_map&& other) 
    
        if (this!=&other) 
          std::lock(entry_mutex, other.entry_mutex);
          std::lock_guard<boost::shared_mutex> _mylock(entry_mutex, std::adopt_lock);
          std::shared_lock<boost::shared_mutex> _otherlock(other.entry_mutex, std::adopt_lock);
          umap = std::move(other.umap)
        
        return *this;       
    

谢谢 合资公司

【问题讨论】:

beginend 函数已在标准库中为您提供。不,它们不是线程安全的。 这个问题似乎跑题了,因为它属于 codereview.stackexchange.com 由于迭代器可能被其他线程无效,您不能提供开始/结束。你应该提供一个for_each 方法。 与其重新发明***,不如考虑使用 TBB 的 concurrent hash map。 @sjdowling: 或concurrent unordered map,哪个API更接近C++无序映射之一。 【参考方案1】:

您无法创建提供与底层标准容器相同接口的线程安全容器,即使您同步每个方法调用也是如此。这是因为接口规范本身并不打算在多线程环境中使用。

这是一个示例:假设您有多个线程同时插入到同一个容器对象中:

c->insert(new_value);

因为你同步了每个方法调用,这很好,这里没问题。

但与此同时,另一个线程试图循环遍历容器中的所有元素:

auto itr = c->begin();
while (itr != c->end())

    // do something with itr
    ++itr;

我这样写是为了说明问题:即使对 begin 和 end 的调用是内部同步的,你也不能原子地执行“遍历所有元素”的操作,因为你需要不止一个方法调用来完成这个任务。在循环运行时,只要任何其他线程向容器插入内容,这种情况就会中断。

所以如果你想拥有一个无需外部同步就可以使用的容器,你需要一个线程安全的接口。例如,“循环遍历所有元素”任务可以通过提供 for_each 方法以原子方式完成:

c.for_each([](const value_type& value)

    // do something with value
); 

【讨论】:

【参考方案2】:

你不能简单地同步每个方法并获得一个线程安全的对象,因为有些操作需要不止一次的方法调用,并且如果容器在方法调用之间发生突变,就会中断。

一个典型的例子是迭代。

线程安全的一个简单方法是像这样滥用 C++14 特性:

template<class T>
struct synchronized 
  // one could argue that rvalue ref qualified version should not be
  // synchronized...  but I think that is wrong
  template<class F>
  std::result_of_t< F(T const&) > read( F&& f ) const 
    auto&& lock = read_lock();
    return std::forward<F>(f)(t);
  
  template<class F>
  std::result_of_t< F(T&) > write( F&& f ) 
    auto&& lock = write_lock();
    return std::forward<F>(f)(t);
  
  // common operations, useful rvalue/lvalue overloads:
  // get a copy of the internal guts:
  T copy() const&  return read([&](auto&&t)->Treturn t;); 
  T copy() &&  return move(); 
  T move()  return std::move(*this).write([&](auto&&t)->Treturn std::move(t);); 
private:
  mutable std::shared_timed_mutex mutex;
  std::shared_lock<std::shared_timed_mutex> read_lock() const 
    return std::shared_lock<std::shared_timed_mutex>(mutex);
  
  std::unique_lock<std::shared_timed_mutex> write_lock() 
    return std::unique_lock<std::shared_timed_mutex>(mutex);
  
  T t;
public:
  // relatively uninteresting boilerplate
  // ctor:
  template<class...Args>
  explicit synchronized( Args&&... args ):
    t(std::forward<Args>(args)...)
  
  // copy ctors: (forwarding constructor above means need all 4 overloads)
  synchronized( synchronized const& o ) :t(std::forward<decltype(o)>(o).copy()) 
  synchronized( synchronized const&& o ):t(std::forward<decltype(o)>(o).copy()) 
  synchronized( synchronized & o )      :t(std::forward<decltype(o)>(o).copy()) 
  synchronized( synchronized && o )     :t(std::forward<decltype(o)>(o).copy()) 
  // copy-from-T ctors: (forwarding constructor above means need all 4 overloads)
  synchronized( T const& o ) :t(std::forward<decltype(o)>(o)) 
  synchronized( T const&& o ):t(std::forward<decltype(o)>(o)) 
  synchronized( T & o )      :t(std::forward<decltype(o)>(o)) 
  synchronized( T && o )     :t(std::forward<decltype(o)>(o)) 
;

这似乎晦涩难懂,但效果很好:

int main() 
  synchronized< std::unordered_map<int, int> > m;
  m.write( [&](auto&&m) 
    m[1] = 2;
    m[42] = 13;
  );
  m.read( [&](auto&&m) 
    for( auto&& x:m ) 
      std::cout << x.first << "->" << x.second << "\n";
    
  );
  bool empty = m.read( [&](auto&&m) 
    return m.empty();
  );
  std::cout << empty << "\n";
  auto copy = m.copy();
  std::cout << copy.empty() << "\n";

  synchronized< std::unordered_map<int, int> > m2 = m;
  m2.read( [&](auto&&m) 
    for( auto&& x:m ) 
      std::cout << x.first << "->" << x.second << "\n";
    
  );

这个想法是你将你的操作嵌入到 lambdas 中,它在同步的上下文中执行。

编码风格有点晦涩,但并非难以管理(至少具有 C++14 特性)。

C++11 的一个很好的特性是在同一个容器上的两个const 操作是合法的,即使来自两个不同的线程。所以read 只是简单地传递了一个const 对容器的引用,你可以在其中执行的几乎所有操作都可以与另一个线程并行执行。

live example

【讨论】:

read 应该分别返回 std::result_of_t&lt; F(T const&amp;) &gt;write std::result_of_t&lt; F(T&amp;) &gt;-&gt; decltype(std::forward&lt;F&gt;(f)(t)) 对于这两种情况可能更具可读性,甚至 auto 因为我认为 SFINAE 不会非常有用。 @Casey 已修复,以及其他一些错误。我喜欢auto,但我不希望移植到 C++14 之前的版本令人讨厌。请注意,result_of_tauto 一样不支持 SFINAE。传达对存储类型调用函数的结果是调用writeread 的结果似乎是值得的。 这段代码无法编译,我认为复制操作的定义不正确的 , )... @carlos.baez 是的,缺少);;。添加了.move() 和副本的测试代码。更新了实时示例。【参考方案3】:

有一个线程安全的std::unordered_map 实现是可能的(但通常不是很有用) - 问题是每个迭代器对象都需要锁定一个递归互斥锁,直到它的析构函数运行。这不仅会有点慢,而且迭代器会在内存使用中膨胀,还有一些功能问题:即使迭代器“当前”不被用于读取或写入容器(例如,对于某些二级索引,或者作为“游标”,或者因为在使用它们之后,它们的销毁是延迟的,直到封闭范围退出或拥有的对象被销毁):这意味着其他线程可能会被阻塞很长时间,实际上围绕容器操作的程序逻辑可能会构成一种死锁。

【讨论】:

以上是关于std::unordered_map 上的线程安全包装器的主要内容,如果未能解决你的问题,请参考以下文章

不断迭代的线程安全的 Unordered_map

C++17 复制构造函数,std::unordered_map 上的深拷贝

与 C++ unordered_map 的并行性

std::unordered_map::clear() 做啥?

两个 std::unordered_map 的交集

std::hash 特化仍未被 std::unordered_map 使用