使用条件变量的 Pthread 程序死锁
Posted
技术标签:
【中文标题】使用条件变量的 Pthread 程序死锁【英文标题】:Pthread program deadlocks using condition variables 【发布时间】:2010-12-07 21:29:01 【问题描述】:此程序试图完成的任务:
这个程序应该同步“visitors”和“cars”的几个线程。参观者会随机游荡一段时间,直到他们决定乘车。如果他们排在第一位,并且有车可用,他们就会搭车,否则他们必须等到排在第一位或有车回来。如果没有游客排队,汽车会按顺序等待,直到有游客想上车。
更多背景信息:
我按照接受的答案here 中的建议使用条件变量重新制作了我的线程同步程序。我知道我走在正确的轨道上,但我的程序由于某种原因仍然死锁,而对于我的一生,我无法弄清楚为什么。除非我给你代码,否则我不知道你能如何帮助我,所以这里是:
问题:
1.) 短暂的死锁
2.) 有时访客排在最前面,但从来没有上车。
解决方案:
我的代码中的错误太多了……我想在修复一个错误时,我经常(不经意间)引入另一个错误。我只是不断地删除程序的功能,直到我消除了所有的错误,然后以不会使我的程序死锁的方式一个一个地重新构建功能。谢谢大家的建议。
【问题讨论】:
这样的代码墙通常不会引起太多关注。尝试将其减少到仍然存在问题的最小子集。 请描述这个程序试图完成什么。它建模的过程是什么?它必须遵循什么规则?哪些不变量不能被违反?开始和结束状态是什么样的? 死锁的回溯是什么?这通常很有帮助 【参考方案1】:首先,xscott 是正确的,您错误地使用了互斥锁。不管它是否看起来工作了一小会儿,它仍然是错误的,并且可能只是由于纯粹的机会而看起来工作。
我认为最好的方法是从首要原则构建设计,而不是逐行检查您的代码。我会这样描述我认为你所追求的基本算法:
visitor
sleep
join end of visitor queue
wait until at head of visitor queue
wait until there is a car free
remove car from car queue
remove self from visitor queue
occupy car
wait until not in car anymore
car
join end of car queue
wait until occupied
sleep
eject visitor from car
请注意,我没有明确标记唤醒点 - 只是等待。这是最好的方法 - 弄清楚你需要在哪里等待状态改变,然后你只需要在状态改变时放置唤醒(向条件变量发出信号)。
下一步是确定需要互斥锁保护的主要共享数据。我明白了:
- The visitor queue;
- The car queue;
- The status of each car.
因此,最精细的方法是为访客队列设置一个互斥锁(我们可以使用您的v_mutex
),一个用于汽车队列(c_mutex
),一个用于每辆车(sc[CARS]
)。或者,您可以使用c_mutex
来保护车辆队列和每辆车的状态;或者只是使用v_mutex
来保护一切。但为了学习起见,我们将采用更复杂的方法。
下一步是确定条件变量有用的等待点。对于wait until at head of visitor queue
,您的每位访客条件变量 (v_cond
) 就可以了;对于wait until there is a car free
,添加另一个条件变量(v_car_cond
)是最简单的。对于wait until occupied
,每辆车条件变量c_cond
是合适的。对于wait until not in car anymore
,可以使用v_cond
或c_cond
,因为此时汽车和访客之间存在一对一的关系。不需要v_cond2
。
我们现在可以根据 pthreads 原语重写上面的伪代码。在大多数情况下,这非常接近你已经拥有的 - 所以你肯定是在正确的轨道上。首先是访客:
/* join end of visitor queue */
pthread_mutex_lock(&v_mutex);
v_line[v_id] = set_visitor_place_in_line();
printf("Visitor %d is %d in line for a ride\n", v_id, v_line[v_id]);
pthread_mutex_unlock(&v_mutex);
/* wait until first in line */
pthread_mutex_lock(&v_mutex);
while (v_line[v_id] != 1)
pthread_cond_wait(&v_cond[v_id], &v_mutex);
pthread_mutex_unlock(&v_mutex);
/* wait until there is a car free */
pthread_mutex_lock(&c_mutex);
while ((car = get_next_car()) == CARS)
pthread_cond_wait(&v_car_cond, &c_mutex);
pthread_mutex_unlock(&c_mutex);
/* remove car from car queue */
pthread_mutex_lock(&c_mutex);
move_car_line();
/* NOTE: We do not need to signal v_car_cond here, because only the first
* visitor in line can be waiting there, and we are still the first visitor
* in line at this point. */
pthread_mutex_unlock(&c_mutex);
/* remove self from visitor queue */
pthread_mutex_lock(&v_mutex);
move_passenger_line();
next_v = get_next_passenger();
if (next_v < VISITORS)
pthread_cond_signal(&v_cond[next_v]);
pthread_mutex_unlock(&v_mutex);
/* occupy car */
pthread_mutex_lock(&sc[car]);
c_state[car] = v_id;
pthread_cond_signal(&c_cond[car]);
pthread_mutex_unlock(&sc[car]);
/* wait until not in car anymore */
pthread_mutex_lock(&sc[car]);
while(c_state[car] == v_id)
pthread_cond_wait(&v_cond[v_id], &sc[car]);
pthread_mutex_unlock(&sc[car]);
二、汽车:
/* join end of car queue */
pthread_mutex_lock(&c_mutex);
c_line[c_id] = set_car_place_in_line();
if (c_line[c_id] == 1)
pthread_cond_signal(&v_car_cond);
pthread_mutex_unlock(&c_mutex);
/* wait until occupied */
pthread_mutex_lock(&sc[c_id]);
while ((v_id = c_state[c_id]) == VISITORS)
pthread_cond_wait(&c_cond[c_id], &sc[c_id]);
pthread_mutex_unlock(&sc[c_id]);
/* visitor is on car ride for random amount of time */
sleep(rand()%5);
/* eject visitor from car */
pthread_mutex_lock(&sc[c_id]);
c_state[c_id] = VISITORS;
pthread_cond_signal(&v_cond[v_id]);
pthread_mutex_unlock(&sc[c_id]);
上面的内容可以简化——只要有一个pthread_mutex_unlock()
紧跟同一个互斥锁的pthread_mutex_lock()
,就可以删除解锁/锁定对。
PS:
您不必担心您的汽车以“错误的顺序”加入汽车队列 - 无论如何,当它们在公园里滚动时,它们都会出现故障。如果您真的关心这一点,请在启动任何汽车线程之前将汽车放入主线程中的队列,并更改汽车代码,使其在主循环结束时将自己重新添加到队列中。
采用这种方法的整体代码,省略了你的全局变量和辅助函数,看起来像这样:
pthread_mutex_t c_mutex = PTHREAD_MUTEX_INITIALIZER; /* mutex protecting c_line and cars_out */
pthread_mutex_t v_mutex = PTHREAD_MUTEX_INITIALIZER; /* mutex protecting v_line */
pthread_cond_t c_cond[CARS]; /* condition variables for waiting for c_state[i] to change from VISITORS to another value */
pthread_cond_t v_cond[VISITORS]; /* condition variables for visitor waiting to be first in line or ejected from a car */
pthread_cond_t v_car_cond = PTHREAD_COND_INITIALIZER; /* Condition variable for a visitor first in line, but waiting for a car. */
pthread_mutex_t sc[CARS]; /* one mutex per car, sc[i] protects c_state[i] */
int main()
int i;
void *status;
pthread_t c[CARS];
pthread_t v[VISITORS];
srand(time(NULL));
printf("Jurassic Park is open, cars are being prepped for passengers\n");
for (i = 0; i < CARS; i++)
pthread_mutex_init(&sc[i], NULL);
pthread_cond_init(&c_cond[i], NULL);
for (i = 0; i < VISITORS; i++)
pthread_cond_init(&v_cond[i], NULL);
for (i = 0; i < CARS; i++)
c_state[i] = VISITORS;
pthread_create(&c[i], NULL, car, (void *)i);
for (i = 0; i < VISITORS; i++)
pthread_create(&v[i], NULL, visitor, (void *)i);
for (i = 0; i < VISITORS; i++)
pthread_join(v[i], &status);
museum_empty++;
printf("All visitors have left Jurassic Park\n");
for (i = 0; i < CARS; i++)
pthread_mutex_lock(&sc[i]);
c_state[i] = -1;
pthread_cond_signal(&c_cond[i]);
pthread_mutex_unlock(&sc[i]);
for (i = 0; i < CARS; i++)
pthread_join(c[i], &status);
printf("Jurrasic Park is closed, all cars have been parked\n");
pthread_exit(NULL);
return 0;
void *car(void *i)
int c_id = (int) i;
int v_id;
while (museum_empty != 1)
/* join end of car queue */
pthread_mutex_lock(&c_mutex);
c_line[c_id] = set_car_place_in_line();
if (c_line[c_id] == 1)
pthread_cond_signal(&v_car_cond);
printf("Car %d is ready for a passenger and is %d in line %d of %d cars are out\n", c_id, c_line[c_id], cars_out, CARS);
pthread_mutex_unlock(&c_mutex);
/* wait until occupied */
pthread_mutex_lock(&sc[c_id]);
while ((v_id = c_state[c_id]) == VISITORS)
pthread_cond_wait(&c_cond[c_id], &sc[c_id]);
pthread_mutex_unlock(&sc[c_id]);
if(museum_empty == 1)
break;
pthread_mutex_lock(&c_mutex);
cars_out++;
printf("Visitor %d is riding in car %d %d of %d cars are out --\n", v_id, c_id, cars_out, CARS);
pthread_mutex_unlock(&c_mutex);
/* visitor is on car ride for random amount of time */
sleep(rand()%5);
printf("Visitor %d is done riding in car %d\n", v_id, c_id);
/* eject visitor from car */
pthread_mutex_lock(&sc[c_id]);
c_state[c_id] = VISITORS;
pthread_cond_signal(&v_cond[v_id]);
pthread_mutex_unlock(&sc[c_id]);
pthread_mutex_lock(&c_mutex);
cars_out--;
pthread_mutex_unlock(&c_mutex);
return NULL;
void *visitor(void *i)
int v_id = (int) i;
int next_v;
int j = 0;
int car;
while (j < RIDES)
if (j == 0)
printf("Visitor %d entered museum and is wandering around\n", v_id);
else
printf("Visitor %d got back from ride and is wandering around\n", v_id);
sleep(rand()%3); /* visitor is wandering */
/* join end of visitor queue */
pthread_mutex_lock(&v_mutex);
v_line[v_id] = set_visitor_place_in_line();
printf("Visitor %d is %d in line for a ride\n", v_id, v_line[v_id]);
/* wait until first in line */
while (v_line[v_id] != 1)
pthread_cond_wait(&v_cond[v_id], &v_mutex);
pthread_mutex_unlock(&v_mutex);
/* wait until there is a car free */
pthread_mutex_lock(&c_mutex);
while ((car = get_next_car()) == CARS)
pthread_cond_wait(&v_car_cond, &c_mutex);
/* remove car from car queue */
move_car_line();
pthread_mutex_unlock(&c_mutex);
/* remove self from visitor queue */
pthread_mutex_lock(&v_mutex);
move_passenger_line();
next_v = get_next_passenger();
if (next_v < VISITORS)
pthread_cond_signal(&v_cond[next_v]);
pthread_mutex_unlock(&v_mutex);
/* occupy car */
pthread_mutex_lock(&sc[car]);
c_state[car] = v_id;
pthread_cond_signal(&c_cond[car]);
/* wait until not in car anymore */
/* NOTE: This test must be against v_id and *not* VISITORS, because the car could have been
* subsequently occupied by another visitor before we are woken. */
while(c_state[car] == v_id)
pthread_cond_wait(&v_cond[v_id], &sc[car]);
pthread_mutex_unlock(&sc[car]);
j++;
printf("Visitor %d leaving museum.\n", v_id);
return NULL;
我希望这会有所帮助...
【讨论】:
我没有使用你所有的建议,但我使用了其中的一些,改变了一些其他的东西,最后让它正常工作。感谢您花时间浏览我的代码并给出如此详细的答案。【参考方案2】:您有很多代码,因此不太可能有人会为您找到所有错误。然而,一些cmets:
互斥锁不是信号量。您在 main() 中的几个 for 循环正在解锁您尚未锁定的互斥锁。这几乎可以肯定是一个错误。从概念上讲,可以使用信号量来实现互斥量,并且您可以使用互斥量和 condvar 来实现信号量,但是您正在解锁一个未锁定的互斥量,这是不正确的。
每个线程都应该锁定一个互斥体,做一些工作,然后解锁它。线程不应解锁已被另一个线程锁定的互斥锁,或抢先解锁未锁定的互斥锁。如果这完全可行,那么这是您当前使用的实现中的一个怪癖,并且不能移植到其他实现。
您在 main 中的第二个 for 循环连续两次锁定同一个互斥锁。您是否在代码中越过了这一点?由于您正在循环播放,因此您锁定它的次数多于解锁它的次数。如果您的代码停在这里,我不会感到惊讶。 (有时互斥锁可以是递归的,但 pthread 互斥锁不是默认的。)
pthread_cond_signal() 实际上只是对 pthread_cond_broadcast() 的优化。使用广播,直到你的比赛条件得到解决。
在启动线程之前,您应该在 main 顶部初始化所有互斥锁和 condvar。您可能在这里没有错误,但它不会造成伤害,而且会有所帮助。
如果您在短期内将所有内容缩减为单个互斥锁和单个 condvar,您可能会做得更好。看起来您正在尝试对所有内容进行细粒度锁定,但除非您对锁的顺序非常小心,否则您将遇到竞争条件和死锁。
真的,只有一种模式/模板应该与互斥锁和条件变量一起使用:
pthread_mutex_lock(...);
// optionally wait for something to be true
while (!some_condition)
pthread_cond_wait(...);
// make changes for how things should now be
shared_variable = new_value;
// optionally notify the rest of the world of your change
pthread_cond_broadcast(...);
pthread_mutex_unlock(...);
如果您有一个互斥锁和 condvar,您应该尝试使所有同步块看起来像这样。如果你不需要等待,你可以省略 while(...)/wait 的东西,如果没有其他线程关心你所做的更改,你可以省略广播,但如果你的代码看起来不粗像每个同步块一样,这可能是一个错误。
【讨论】:
@xscott 感谢您的意见,但我解决了您在我的代码的 cmets 中所做的每条评论,特别说明了为什么我会按照我的方式做事。您说“互斥锁不是信号量”,但互斥锁可以用作信号量。我在我的代码中解释了为什么我以我的方式锁定我的主要方法中的循环。您还说过“一个线程不应解锁已被另一个线程锁定的互斥锁”,但解锁已阻塞另一个线程的互斥锁的线程是完全有效的,在这种情况下,所讨论的互斥锁是一个信号量。我的代码运行,如果你运行它,你会看到它有多远。 @typoknig,请随意忽略我的建议,但是您的 cmets 没有解释您为什么不正确地使用互斥锁,而且您显然确实有一两个问题。我认为您最可能的问题是您认为 pthread 互斥锁的行为类似于信号量。我建议您编写一些小型测试程序来检查您的假设,并确保在您的 pthread 调用中建议返回结果。 @xscott,当使用 pthreads 时没有信号量,只有互斥量。使用互斥锁的方式可以模仿信号量。我的 main 方法中的第一条评论解释了为什么我像创建线程时那样使用互斥锁。显然我错误地使用了互斥锁,否则我的程序不会被锁定,但我没有在你提到的任何地方正确使用它们。此外,如果您查看我之前的问题(链接到此问题),您会看到我确实已经编写了一个小程序来测试这个结构的一个小版本。 @typoknig,很明显这是一个家庭作业问题,你还在学习。这让我有点好奇你为什么对自己如此自信。如果您查看 pthread_mutex_unlock() 的手册页,您会看到有一个错误返回代码用于解锁被不同线程锁定的互斥锁。您错误地使用它们。但是,由于您不太乐于提供帮助,因此我不在乎您是否失败。 @xscott 这是昨天的家庭作业问题,但我的教授让我们选择了几个问题。因为我在这个问题上碰壁了,所以我又做了一个并把它上交了。不过,我有很多时间去做这件事,我想知道什么地方不能正常工作。由于我们在线,我认为您读错了我,所以我不会对您对我失败的评论感到生气。我很乐于提供帮助,但你没有给我任何可以接受的东西。程序运行了好几个循环才被锁死,所以很明显问题不在main方法上。【参考方案3】:我认为您更喜欢使用信号量。下面是信号量的互斥量和条件变量实现:
typedef struct
pthread_mutex_t mutex;
pthread_cond_t condvar;
unsigned long count;
semaphore_t;
void semaphore_init (semaphore_t* sem, unsigned long count)
pthread_mutex_init(&sem->mutex, 0);
pthread_cond_init(&sem->condvar, 0);
pthread_mutex_lock(&sem->mutex);
sem->count = count;
pthread_mutex_unlock(&sem->mutex);
void semaphore_incr (semaphore_t* sem)
pthread_mutex_lock(&sem->mutex);
sem->count++;
pthread_cond_broadcast(&sem->condvar);
pthread_mutex_unlock(&sem->mutex);
void semaphore_decr (semaphore_t* sem)
pthread_mutex_lock(&sem->mutex);
while (sem->count == 0)
pthread_cond_wait(&sem->condvar, &sem->mutex);
sem->count--;
pthread_mutex_unlock(&sem->mutex);
也许如果您根据信号量来实施您的解决方案,您将获得您所追求的结果。
【讨论】:
以上是关于使用条件变量的 Pthread 程序死锁的主要内容,如果未能解决你的问题,请参考以下文章