11.6 线程同步
Posted U201013687
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了11.6 线程同步相关的知识,希望对你有一定的参考价值。
Example
11.6.2 避免死锁
Example
11.6.3 pthread_mutex_timedlock 函数
Example
11.6.4Reader-Writer Locks
Example
11.6.5 带有超时功能的读写锁
11.6.6 条件变量
Example
11.6.7 自旋锁
11.6.8 Barriers
Example
当多个线程控制流需要共享内存的时候,我们需要确保每一个线程所看到的数据是一致的。如果一个线程使用别的线程不会读取或者修改的数据,那么一致性问题并不会出现。类似地,如果一个变量是只读的,即使多个线程同时进行访问也不会有一致性问题,然而,当一个线程可以改写一个变量,而其他线程可以读取或者改写同一变量的时候,我们就需要进行线程同步,从而确保它们不会获取到无效数据内容。
当一个线程修改一个变量的时候,其他线程进行的读操作的结果可能出现结果的不一致性,在修改变量数值需要操作一个周期的处理器架构上,这可能出现在内存读与内存写交互执行的情况下,当然了,这种情况是与处理器架构相关的,但是可移植程序不能对使用哪一种处理器架构做任何假设。
图11.7展示了两个线程对同一变量进行读写操作的例子。在这个例子中,线程A读取变量,然后想变量中写入一个新的数值,但是写操作花费了两个内存周期,如果线程B在这两个写循环之间读取同一变量,那么读取的结果就会出现不一致。
为了解决上述问题,线程必须使用一个锁,该所仅仅允许同时只能有一个线程访问变量,图11.8展示了这一同步机制,如果想要读取变量,线程B就需要获取锁,类似地,当线程A更新变量的时候,它也会获取到相同的锁,这样线程B就不能读取变量的值了,需要等待线程A释放锁。
我们也需要同步两个或者多个线程同时修改同一变量的问题,考虑我们需要对变量执行自增操作的情况(如图11.9所示),整个自增操作通常会被分成三步完成:
- 将内存中的变量读取到寄存器。
- 对寄存器中的数值进行自增处理。
- 将新的数值写会到内存中;
如果两个线程几乎同时对同一变量进行自增操作,并且没有进行同步,结果可能就会出现不一致,在两个线程完成相应的自增操作以后,实际变量的值可能增加了1,也可能增加了2,这与第二个线程相对第一个线程开始操作的时间有关。如果第二个线程在第一个线程执行第三步之前执行操作1,那么第二个线程将会读取到与第一个线程完全一样的初始值,自增以后,然后写会,并没有达到想要的效果。
如果操作是原子的,那么就不会出现竞态条件,在前一个例子中,如果自增仅仅花费一个内存周期就可以完成,那么就不会出现竞态条件。
11.6.1 互斥
我们可以利用pthreads的互斥接口来保护数据,确保同时只能有一个线程可以访问数据,互斥(mutex)是基于锁的,在我们访问共享数据之前需要先设置锁,在完成数据访问之后需要释放锁,当锁被设定的时候,其他尝试去设置锁的线程将进入阻塞,直到当前获取到锁的线程释放掉锁。在解锁的时候,如果有多个线程同时被阻塞,所有阻塞在该锁上的线程都将变成可运行的,其中第一个运行的线程将能够设置锁,其他线程将会看到互斥锁仍然处于锁定状态,然后又愉快地开始了等待锁在一次变成可用的,采取这种方式就只有一个线程能够进行处理。
互斥机制当且仅当我们按照统一的数据访问规则设计线程的情况下才会工作,操作系统并不会为我们设置串行数据访问。如果我们允许一个线程在访问数据之前不需要进行获取锁,那么及时其余的所有线程都在尝试访问数据之前获取锁,不一致性也可能会出现。
互斥变量使用数据类型pthread_mutex_t表示,在我们使用一个互斥变量之前,我们必须对其进行初始化工作,要么将其初始化为常量PTHREAD_MUTEX_INITIALIZER(for statically allocated mutexes only),或者是调用函数pthread_mutex_init对其进行初始化。如果我们动态地进行互斥锁分配(比如说,调用函数malloc),我们我们需要在释放掉内存之前调用函数pthread_mutex_destroy.
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
Both: return 0 if OK,error number on failure.
当设置attr为NULL的时候,函数使用默认属性初始化互斥锁,我们将在12.4节中讨论互斥锁属性。
为了锁定一个互斥锁,我们需要调用函数pthread_mutex_lock,如果锁已经被锁定了,调用线程将会被阻塞,直到互斥锁被接触为止,为了接触一个互斥锁,我们需要调用函数pthread_mutex_unblock.
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
All return: 0 if OK, error number on failure.
如果一个线程不想要被阻塞的话,那么它可以调用函数pthread_mutex_trylock,从而实现条件锁定互斥锁。如果互斥锁在函数pthread_mutex_trylock被调用的时候是未锁定的,那么函数pthread_mutex_trylock将不会阻塞并且锁定互斥锁,最后返回0.否则,函数pthread_mutex_trylock将会失败,返回EBUSY并且并不会锁定互斥锁。
Example
图11.10中使用了互斥锁来保护一个数据结构。当多个线程需要同时访问一个动态 分配的对象的时候,我们可以嵌入一个引用计数器,以确保所有线程完成使用之前不会对其进行释放。
#include <stdlib.h>
#include <pthread.h>
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
/* ...more stuff here... */
};
struct foo *foo_alloc(int id) /*allocate the object */
{
struct foo *fp;
if((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return NULL;
}
/* ...continue initialization... */
}
return fp;
}
void foo_hold(struct foo *fp) /*add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
void release(struct foo *fp) /*release a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
if(--fp->f_count == 0) /*last reference*/
{
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
pthread_mutex_unlock(&fp->f_lock);
}
}
图11.10 使用互斥锁来保护一个数据结构
我们在增加引用计数,减小引用计数,检查引用计数是否为0的时候锁定f_lock.在首次分配空间以及初始化引用计数为1的时候并没有必要获取锁,因为只有分配线程在引用正在分配的数据结构实体。但是如果我们将新分配的数据结构放到一个链表中,那么就可能被其他的线程访问到,所以在这种情况下就需要首先锁定。
在使用对象之前,线程需要通过调用函数foo_hold来增加一次引用,在完成对象使用以后,必须通过函数foo_release函数释放引用,当最后一次引用被释放的时候,对象存储空间被释放。
在这个例子中,我们忽略了线程在调用函数foo_hold之前是如何找到对象的。即使引用计数为0,如果另外一个线程在调用函数foo_hold时候被阻塞在互斥锁上时使用函数foo_release释放对象内存将会是一个错误???.我们可以通过确保在释放其内存之前保证该对象不被发现来避免问题,我们将在后续的例子中看到如何实现这一想法。
11.6.2 避免死锁
如果一个线程尝试锁定相同的互斥锁两次的话,该线程就陷入了死锁状态,除此之外,还有一些不那么明显的方法会产生死锁。举例来说,当我们在我们的程序中使用超过一个互斥锁的时候,如果我们允许一个线程锁定一个互斥锁的同时获取第二个互斥锁,同时另一个线程锁定了第二个互斥锁并尝试获取第一个互斥锁,两个线程都将无法运行,因为每一个线程都需要被对方锁定的资源,于是死锁就出现了。
死锁可以通过仔细控制互斥锁获取的顺序来避免,举例来说,假设你有两个互斥锁,A以及B.如果所有线程总是在获取互斥锁B之前获取互斥锁A,那么这两个互斥锁就不会产生死锁问题(但是其他资源仍然可能产生死锁问题)。类似地,如果所有线程都总是在锁定互斥锁A之前锁定互斥锁B,那么也不会出现死锁:仅仅当一个线程尝试以与另一个线程相反的互斥锁获取顺序进行获取的时候才会产生潜在的死锁风险。
有些时候,一些应用程序架构很难保证互斥锁的获取顺序,如果在你写的函数中设计到了很多的锁以及数据结构,并且你不能将他们分解成简单的架构,那么你就必须尝试一些其他方法,在这种情况下,你可以尝试在获取锁失败的情况下释放已经获取到的锁,并稍后在获取一次。你可以使用函数pthread_mutex_trylock接口来避免死锁,如果你已经获取到了一个锁,并且函数pthread_mutex_trylock成功了,那么你就可以继续往后处理。但是如果不能获取到锁,那么你可能需要释放掉你已经获取到的锁,并在一段时间之后再行尝试。
Example
在这个例子中,我们更新了图11.10中的程序,使用了两个互斥锁,我们通过保证相同的锁定顺序来确保死锁问题不会出现。其中第二个互斥锁用于锁定一个跟踪foo数据结构的哈希表,hashlock不仅保护了哈希列表,还保护了foo结构中的f_next数据域。锁f_lock保护了foo结构中的其他数据域。
#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here */
};
struct foo *foo_alloc(int id)
{
struct foo *fp;
int idx;
if((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return (NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ... continue initialization... */
pthread_mutex_unlock(&fp->f_lock);
}
return (fp);
}
void foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo *foo_find(int id) /* find an exsting object */
{
struct foo *fp;
pthread_mutex_lock(&hashlock);
for(fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next)
{
if(fp->f_id == id)
{
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return(fp);
}
void foo_rele(struct foo *fp) /*release a reference to the object */
{
struct foo *tfp;
int idx;
pthread_mutex_lock(&fp->f_lock);
if(fp->f_count == 1)
{
/* last reference */
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_lock(&hashlock);
pthread_mutex_lock(&fp->f_lock);
/*need to recheck the condition */
if(fp->f_count != 1)
{
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
return;
}
/* remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if(tfp == fp)
{
fh[idx] = fp->f_next;
}
else
{
while(tfp->f_next != fp)
{
tfp = tfp->f_next;
}
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
}
}
图11.11 使用两个互斥锁
比较图11.11与11.10中的程序,可以看出,现在在分配函数中首先锁定hash表,增加一个数据结构到哈希数组中,在解锁hash锁之前,获取新分配结构中的互斥锁,因为将新分配的结构放到了一个全局列表中,因此在初始化结构内容的过程中,需要防止其他线程对新分配的结构进行访问。
函数foo_find首先锁定hash表,然后搜索指定的数据结构,如果找到指定的数据结构,那么就增加一次引用计数,并返回一个结构指针,注意,在顺序上,保证了先获取hash锁在获取f_lock互斥锁的顺序。
使用两个互斥锁以后,函数foo_release更加复杂了,如果是最后一次引用,我们需要首先解锁结构锁,以致于我们可以获取到哈西表结构,因为我们需要从hash表中移除一个指定结构。然后我们再次获取结构锁,在此,需要再次检查是否需要释放掉指定结构,因为在上一次释放掉结构锁的时候可能其他线程被线程锁阻塞,当释放掉锁的时候,其他线程会对其执行操作,如果其他线程对该结构增加了一次引用,那么我们就只是简单地将引用技术减少一次即可。解锁掉两个互斥锁,然后直接返回即可。
可以看出上述锁方法实现比较复杂,所以我们需要再次进行简化设计,考虑使用hash表增加对引用计数的保护。
图11.12反映了这一该改变:
#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo
{
int f_count; /* protected by hashlock */
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here */
};
struct foo *foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
int idx;
if((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return (NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ... continue initialization... */
pthread_mutex_unlock(&fp->f_lock);
}
return (fp);
}
void foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo *foo_find(int id) /* find an exsting object */
{
struct foo *fp;
pthread_mutex_lock(&hashlock);
for(fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next)
{
if(fp->f_id == id)
{
fp->f_count++;
break;
}
}
pthread_mutex_unlock(&hashlock);
return(fp);
}
void foo_rele(struct foo *fp) /*release a reference to the object */
{
struct foo *tfp;
int idx;
pthread_mutex_lock(&fp->f_lock);
if(--fp->f_count == 0)
{
/* last reference ,remove from list*/
/* remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if(tfp == fp)
{
fh[idx] = fp->f_next;
}
else
{
while(tfp->f_next != fp)
{
tfp = tfp->f_next;
}
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else
{
pthread_mutex_unlock(&hashlock);
}
}
图11.12 简化的锁示例
注意图11.12中的程序对比图11.11中的程序而言变得更加简单了,围绕hash列表以及引用计数的锁定顺序问题已经没有了,因为我们对于上述两个资源使用了同一个锁。多线程软件设计涉及到了像这样的一些折中问题。如果你使用的锁粒度太大,那么就会由许多线程阻塞到同一个锁上的问题。如果锁的粒度太小,你可能会遇到性能问题,因为过多的锁定过程导致性能下降,并且代码设计会比较复杂。作为一个程序员,你需要在代码复杂性与性能之间找到一个平衡,并且仍然保证你的锁定满足要求。
11.6.3 pthread_mutex_timedlock 函数
如下函数允许我们将获取锁的操作与时间进行绑定,函数pthread_mutex_timedlock基本上与函数pthread_mutex_lock相同,但是当其设定的超时时间到达的时候,函数pthread_mutex_timedlock将会返回错误码ETIMEDOUT.并且不会成功获取到互斥锁。
#include <pthread.h>
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);
Returns: 0 if OK, error number on failure.
参数timeout指定了该函数的最长等待时间,指定的形式是使用绝对时间,即是说是一直等待直到时间X,而不是等待时长X.timeout使用timespec进行存储,该结构以秒以及毫秒的形式进行时间的存储。
Example
在图11.13中,我们将看到如何使用函数pthread_mutex_timedlock来避免永久性的锁定。
#include "apue.h"
#include <pthread.h>
int main(void)
{
int err;
struct timespec tout;
struct tm *tmp;
char buf[64];
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock);
printf("mutex is locked\n");
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("current time is %s\n", buf);
tout.
以上是关于11.6 线程同步的主要内容,如果未能解决你的问题,请参考以下文章
Visual Studio for Mac 不同步 Xcode 11.6 和 Interface Builder 中所做的更改