pthread_rwlock 可以同时拥有多少个读者?
Posted
技术标签:
【中文标题】pthread_rwlock 可以同时拥有多少个读者?【英文标题】:How many simultaneous readers can a pthread_rwlock have? 【发布时间】:2012-08-08 14:17:20 【问题描述】:我有一个多线程应用程序,它创建了 48 个线程,所有这些线程都需要访问一个公共属性 (stl::map)。只有在线程启动时才会写入映射,其余时间将从中读取映射。这似乎是 pthread_rw_lock 的完美用例,而且一切似乎都运行良好。
我遇到了一个完全不相关的 seg-fault 并开始分析核心。使用 gdb,我执行了命令info threads
,对结果感到非常惊讶。我观察到几个线程实际上正在按预期从映射中读取,但奇怪的是几个线程在等待 rw_lock 的 pthread_rwlock_rdlock() 中被阻塞。
这是等待锁的线程的堆栈跟踪:
#0 0xffffe430 in __kernel_vsyscall ()
#1 0xf76fe159 in __lll_lock_wait () from /lib/libpthread.so.0
#2 0xf76fab5d in pthread_rwlock_rdlock () from /lib/libpthread.so.0
#3 0x0804a81a in DiameterServiceSingleton::getDiameterService(void*) ()
有这么多线程,很难说有多少正在读取,有多少被阻塞,但我不明白为什么任何线程会被阻塞等待读取,考虑到其他线程已经在阅读。
所以这是我的问题:为什么有些线程阻塞等待读取 rw_lock,而其他线程已经在读取它?似乎可以同时读取的线程数有限制。
我查看了pthread_rwlock_attr_t
函数并没有看到任何相关内容。
操作系统是 Linux,SUSE 11。
相关代码如下:
pthread_rwlock_init(&serviceMapRwLock_, NULL);
// This method is called for each request processed by the threads
Service *ServiceSingleton::getService(void *serviceId)
pthread_rwlock_rdlock(&serviceMapRwLock_);
ServiceMapType::const_iterator iter = serviceMap_.find(serviceId);
bool notFound(iter == serviceMap_.end());
pthread_rwlock_unlock(&serviceMapRwLock_);
if(notFound)
return NULL;
return iter->second;
// This method is only called when the app is starting
void ServiceSingleton::addService(void *serviceId, Service *service)
pthread_rwlock_wrlock(&serviceMapRwLock_);
serviceMap_[serviceId] = service;
pthread_rwlock_unlock(&serviceMapRwLock_);
更新:
正如 MarkB 在 cmets 中所提到的,如果我将 pthread_rwlockattr_getkind_np() 设置为优先考虑写入器,并且有写入器阻塞等待,那么观察到的行为将是有意义的。但是,我使用我认为优先考虑读者的默认值。我刚刚确认有 没有 个线程阻塞等待写入。我还按照@Shahbaz 在 cmets 中的建议更新了代码并获得了相同的结果。
【问题讨论】:
你确定没有写入器锁阻塞吗? @MarkB 这是一个很好的问题!但这不取决于我没有调用的 pthread_rwlockattr_getkind_np() 吗?我不确定是否有任何线程正在等待写入,但它们不应该是,因为这应该只在开始时发生。不过我得检查一下。 @MarkB,如果作者正在等待并且我没有设置 pthread_rwlockattr_getkind_np(),会有什么影响?据我了解,如果有持续的读者,作者可能会饿死,对吧? 来自pthread_rwlock_rdlock
手册页:The pthread_rwlock_rdlock() function applies a read lock to the read-write lock referenced by rwlock. The calling thread acquires the read lock if a writer does not hold the lock *and there are no writers blocked on the lock*.
(强调我的)。
@MarkB,好的,谢谢。我以为上一部分提到的行为只有 pthread_rwlockattr_getkind_np() 才能实现。如果不是,那么这个设置有什么作用?
【参考方案1】:
您只是观察到与获取锁相关的固有性能问题。这需要一些时间,而您恰好在其中捕获了这些线程。当受锁保护的操作持续时间很短时尤其如此。
编辑:阅读源代码,
glibc
使用lll_lock
来保护其自己的 pthread 库数据结构中的关键部分。pthread_rwlock_rdlock
检查多个标志并递增计数器,因此它在持有锁的同时执行这些操作。完成这些操作后,使用lll_unlock
释放锁。
为了演示,我实现了一个在获取rwlock
后休眠的简短例程。主线程等待它们完成。但在等待之前,它会打印线程实现的并发。
enum CONC = 50 ;
pthread_rwlock_t rwlock;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
unsigned count;
void *routine(void *arg)
int *fds = static_cast<int *>(arg);
pthread_rwlock_rdlock(&rwlock);
pthread_mutex_lock(&mutex);
++count;
if (count == CONC) pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
sleep(5);
pthread_rwlock_unlock(&rwlock);
pthread_t self = pthread_self();
write(fds[1], &self, sizeof(self));
return 0;
而主线程等待计数器达到50:
int main()
int fds[2];
pipe(fds);
pthread_rwlock_init(&rwlock, 0);
pthread_mutex_lock(&mutex);
for (int i = 0; i < CONC; i++)
pthread_t tid;
pthread_create(&tid, NULL, routine, fds);
while (count < CONC) pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
std::cout << "count: " << count << std::endl;
for (int i = 0; i < CONC; i++)
pthread_t tid;
read(fds[0], &tid, sizeof(tid));
pthread_join(tid, 0);
pthread_rwlock_destroy(&rwlock);
pthread_exit(0);
编辑:使用 C++11 线程支持简化示例:
enum CONC = 1000 ;
std::vector<std::thread> threads;
pthread_rwlock_t rwlock;
std::mutex mutex;
std::condition_variable cond;
unsigned count;
void *routine(int self)
pthread_rwlock_rdlock(&rwlock);
std::unique_lock<std::mutex> lk(mutex);
if (++count == CONC) cond.notify_one();
sleep(5);
pthread_rwlock_unlock(&rwlock);
return 0;
int main()
pthread_rwlock_init(&rwlock, 0);
std::unique_lock<std::mutex> lk(mutex);
for (int i = 0; i < CONC; i++)
threads.push_back(std::thread(routine, i));
cond.wait(lk, []()return count == CONC;);
std::cout << "count: " << count << std::endl;
for (int i = 0; i < CONC; i++)
threads[i].join();
pthread_rwlock_destroy(&rwlock);
pthread_exit(0);
【讨论】:
我用等待读锁的线程的堆栈跟踪来更新我的问题。你的意思是即使线程在__lll_lock_wait()
中它实际上并没有等待/阻塞,而是它在获取读锁的函数中?如果是这样,那是一个不幸的函数名称,虽然不会是第一个 :)
@是的。我面前没有要确认的来源,但我怀疑这对应于 glibc
进入内核的系统调用入口点。
这是有道理的。并不是说我不相信你:) 但我正在寻找源代码来验证它。找到后我会通知你的,谢谢!
@Brady:我扫描了源代码,并用我发现的内容更新了答案。问候
很好的答案。很好的研究。【参考方案2】:
附带说明,上面发布的代码已损坏。 您无法从 rw_lock'd 部分访问 iter->second,因为一旦您解锁 rw_lock,写入者就可以删除映射中的任何元素,从而使其上的任何迭代器无效。
我知道在你的情况下你没有这样做,因为你在程序执行开始之后没有写任何东西,但仍然值得一提。
另外,作为旁注,由于您描述的行为似乎是序列化的(作者在开始时写入地图,然后读者从现在开始读取“只读”地图),您可能应该这样写这个:
int writerDoneWithMap = 0;
// pthread_cond & mutex init here
// The writer write to the map here
// Then they signal the reader that they are done with it
while (!__sync_bool_compare_and_swap(&writerDoneWithMap, 1, writerDoneWithMap));
pthread_cond_broadcast here
// The readers will then simply do this:
while (!writerDoneWithMap)
// pthread_cond_wait here
// Read the data without locks.
如果作者已经完成了映射,上面的代码避免了对读者的任何锁定,如果他们没有,那么你就求助于典型的 pthread_cond/mutex 技术。 上面的代码是正确的当且仅当你有作家 THEN 读者(但这就是你所说的),否则它将失败。
【讨论】:
以上是关于pthread_rwlock 可以同时拥有多少个读者?的主要内容,如果未能解决你的问题,请参考以下文章