std::unordered_map 上的线程安全包装器
Posted
技术标签:
【中文标题】std::unordered_map 上的线程安全包装器【英文标题】:threadsafe wrapper on std::unordered_map 【发布时间】:2015-01-06 10:16:57 【问题描述】:我正在尝试在 std::unordered_map 之上实现一个线程安全的包装类 下面的开始和结束功能是否安全?
std::unordered_map<Key, T, Hash, Pred, Alloc> umap;
iterator begin()
return umap.begin();
iterator end()
return umap.end();
如果在复制/移动操作符=实现中有任何明显的错误,也请评论
concurrent_unordered_map& operator=(const concurrent_unordered_map& other) ;
if (this!=&other)
std::lock(entry_mutex, other.entry_mutex);
std::lock_guard<boost::shared_mutex> _mylock(entry_mutex, std::adopt_lock);
std::shared_lock<boost::shared_mutex> _otherlock(other.entry_mutex, std::adopt_lock);
umap = other.umap;
return *this;
concurrent_unordered_map& operator=(concurrent_unordered_map&& other)
if (this!=&other)
std::lock(entry_mutex, other.entry_mutex);
std::lock_guard<boost::shared_mutex> _mylock(entry_mutex, std::adopt_lock);
std::shared_lock<boost::shared_mutex> _otherlock(other.entry_mutex, std::adopt_lock);
umap = std::move(other.umap)
return *this;
谢谢 合资公司
【问题讨论】:
begin
和 end
函数已在标准库中为您提供。不,它们不是线程安全的。
这个问题似乎跑题了,因为它属于 codereview.stackexchange.com
由于迭代器可能被其他线程无效,您不能提供开始/结束。你应该提供一个for_each
方法。
与其重新发明***,不如考虑使用 TBB 的 concurrent hash map。
@sjdowling: 或concurrent unordered map,哪个API更接近C++无序映射之一。
【参考方案1】:
您无法创建提供与底层标准容器相同接口的线程安全容器,即使您同步每个方法调用也是如此。这是因为接口规范本身并不打算在多线程环境中使用。
这是一个示例:假设您有多个线程同时插入到同一个容器对象中:
c->insert(new_value);
因为你同步了每个方法调用,这很好,这里没问题。
但与此同时,另一个线程试图循环遍历容器中的所有元素:
auto itr = c->begin();
while (itr != c->end())
// do something with itr
++itr;
我这样写是为了说明问题:即使对 begin 和 end 的调用是内部同步的,你也不能原子地执行“遍历所有元素”的操作,因为你需要不止一个方法调用来完成这个任务。在循环运行时,只要任何其他线程向容器插入内容,这种情况就会中断。
所以如果你想拥有一个无需外部同步就可以使用的容器,你需要一个线程安全的接口。例如,“循环遍历所有元素”任务可以通过提供 for_each 方法以原子方式完成:
c.for_each([](const value_type& value)
// do something with value
);
【讨论】:
【参考方案2】:你不能简单地同步每个方法并获得一个线程安全的对象,因为有些操作需要不止一次的方法调用,并且如果容器在方法调用之间发生突变,就会中断。
一个典型的例子是迭代。
线程安全的一个简单方法是像这样滥用 C++14 特性:
template<class T>
struct synchronized
// one could argue that rvalue ref qualified version should not be
// synchronized... but I think that is wrong
template<class F>
std::result_of_t< F(T const&) > read( F&& f ) const
auto&& lock = read_lock();
return std::forward<F>(f)(t);
template<class F>
std::result_of_t< F(T&) > write( F&& f )
auto&& lock = write_lock();
return std::forward<F>(f)(t);
// common operations, useful rvalue/lvalue overloads:
// get a copy of the internal guts:
T copy() const& return read([&](auto&&t)->Treturn t;);
T copy() && return move();
T move() return std::move(*this).write([&](auto&&t)->Treturn std::move(t););
private:
mutable std::shared_timed_mutex mutex;
std::shared_lock<std::shared_timed_mutex> read_lock() const
return std::shared_lock<std::shared_timed_mutex>(mutex);
std::unique_lock<std::shared_timed_mutex> write_lock()
return std::unique_lock<std::shared_timed_mutex>(mutex);
T t;
public:
// relatively uninteresting boilerplate
// ctor:
template<class...Args>
explicit synchronized( Args&&... args ):
t(std::forward<Args>(args)...)
// copy ctors: (forwarding constructor above means need all 4 overloads)
synchronized( synchronized const& o ) :t(std::forward<decltype(o)>(o).copy())
synchronized( synchronized const&& o ):t(std::forward<decltype(o)>(o).copy())
synchronized( synchronized & o ) :t(std::forward<decltype(o)>(o).copy())
synchronized( synchronized && o ) :t(std::forward<decltype(o)>(o).copy())
// copy-from-T ctors: (forwarding constructor above means need all 4 overloads)
synchronized( T const& o ) :t(std::forward<decltype(o)>(o))
synchronized( T const&& o ):t(std::forward<decltype(o)>(o))
synchronized( T & o ) :t(std::forward<decltype(o)>(o))
synchronized( T && o ) :t(std::forward<decltype(o)>(o))
;
这似乎晦涩难懂,但效果很好:
int main()
synchronized< std::unordered_map<int, int> > m;
m.write( [&](auto&&m)
m[1] = 2;
m[42] = 13;
);
m.read( [&](auto&&m)
for( auto&& x:m )
std::cout << x.first << "->" << x.second << "\n";
);
bool empty = m.read( [&](auto&&m)
return m.empty();
);
std::cout << empty << "\n";
auto copy = m.copy();
std::cout << copy.empty() << "\n";
synchronized< std::unordered_map<int, int> > m2 = m;
m2.read( [&](auto&&m)
for( auto&& x:m )
std::cout << x.first << "->" << x.second << "\n";
);
这个想法是你将你的操作嵌入到 lambdas 中,它在同步的上下文中执行。
编码风格有点晦涩,但并非难以管理(至少具有 C++14 特性)。
C++11 的一个很好的特性是在同一个容器上的两个const
操作是合法的,即使来自两个不同的线程。所以read
只是简单地传递了一个const
对容器的引用,你可以在其中执行的几乎所有操作都可以与另一个线程并行执行。
live example
【讨论】:
read
应该分别返回 std::result_of_t< F(T const&) >
和 write
std::result_of_t< F(T&) >
。 -> decltype(std::forward<F>(f)(t))
对于这两种情况可能更具可读性,甚至 auto
因为我认为 SFINAE 不会非常有用。
@Casey 已修复,以及其他一些错误。我喜欢auto
,但我不希望移植到 C++14 之前的版本令人讨厌。请注意,result_of_t
与 auto
一样不支持 SFINAE。传达对存储类型调用函数的结果是调用write
或read
的结果似乎是值得的。
这段代码无法编译,我认为复制操作的定义不正确的 , )...
@carlos.baez 是的,缺少);
和;
。添加了.move()
和副本的测试代码。更新了实时示例。【参考方案3】:
有一个线程安全的std::unordered_map
实现是可能的(但通常不是很有用) - 问题是每个迭代器对象都需要锁定一个递归互斥锁,直到它的析构函数运行。这不仅会有点慢,而且迭代器会在内存使用中膨胀,还有一些功能问题:即使迭代器“当前”不被用于读取或写入容器(例如,对于某些二级索引,或者作为“游标”,或者因为在使用它们之后,它们的销毁是延迟的,直到封闭范围退出或拥有的对象被销毁):这意味着其他线程可能会被阻塞很长时间,实际上围绕容器操作的程序逻辑可能会构成一种死锁。
【讨论】:
以上是关于std::unordered_map 上的线程安全包装器的主要内容,如果未能解决你的问题,请参考以下文章
C++17 复制构造函数,std::unordered_map 上的深拷贝