C中具有共享队列的线程安全生产者/消费者
Posted
技术标签:
【中文标题】C中具有共享队列的线程安全生产者/消费者【英文标题】:Thread safe producer/consumer with shared queue in C 【发布时间】:2020-10-31 22:44:36 【问题描述】:更新: 我无法弄清楚为什么我会陷入无限循环。我可以通过在队列达到最大大小时设置退出条件来解决错误,但我想通过在代码处理完所有输入文件时保留退出 while 循环的文件计数器 (fileCounter) 来做到这一点。
今天早些时候,我的代码运行正常,但在实施一些更改以尝试处理多个线程(不止一个生产者和一个消费者)时,我似乎一直破坏它。我知道在我弄乱它之前我应该在某个地方有一个副本,但我认为追溯我的步骤很容易。我的最终目标是让多个生产者和消费者线程安全。我在下面包含了标头和 .c 文件。
我的共享缓冲区是一个队列,我的 .c 文件中的前三个函数专门用于初始化、推送到队列以及从队列中弹出。我使用互斥锁“lock”和条件变量 cond 来管理生产者线程何时推送到队列与等待以及消费者线程何时从队列中弹出。在我的 requesterThread 和 resolverThread 函数中,我使用了三个信号量(sem_mutex、sem_full、sem_empty)——一个二进制和两个计数——跟踪我的队列中有多少槽是空的和满的。我最终要管理三个共享资源,但是一个生产者线程和一个消费者线程,唯一的共享资源是队列。
对于这个问题的任何帮助,以及一些关于如何使我的代码对多个线程安全的提示,我们将不胜感激。
谢谢
.h 文件
#ifndef MULTILOOKUP_H_
#define MULTILOOKUP_H_
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <sys/time.h>
#include "/home/user/Desktop/PA3/util.c"
#define MAX_REQUESTER_THREADS 10
#define MAX_RESOLVER_THREADS 5
#define MAX_INPUT_FILES 10
#define MAXSIZE 20
struct arguments
struct queue *q;
char *input;
char *resLog;
char *reqLog;
char line[100];
char result[100];
char ipv6[100];
char *poisonPill;
int numInputFiles;
;
struct node
char name[100];
struct node *next;
;
struct queue
int size;
struct node *head;
struct node *tail;
pthread_mutex_t lock, lock2;
pthread_cond_t cond;
;
void init(struct queue *);
void pop(struct queue *);
void push(struct queue *, char *);
void* requesterThread(void *);
void* resolverThread(void *);
sem_t sem_mutex, sem_full, sem_empty;
#endif
.c 文件
#include "multilookup.h"
/*----------------------Struct Functions------------------------*/
void init(struct queue *q)
q->head = NULL;
q->tail = NULL;
q->size = 0;
pthread_mutex_init(&q->lock, NULL);
pthread_mutex_init(&q->lock2, NULL);
pthread_cond_init(&q->cond, NULL);
void pop(struct queue *q)
pthread_mutex_lock(&q->lock);
if (q->size > 0) pthread_cond_broadcast(&q->cond);
q->size--;
//printf("%s\n", q->head->name);
struct node *tmp = q->head;
q->head = q->head->next;
free(tmp);
pthread_mutex_unlock(&q->lock);
void push(struct queue *q, char *name)
pthread_mutex_lock(&q->lock);
while (q->size == MAXSIZE)
pthread_cond_wait(&q->cond, &q->lock);
q->size++;
if (q->head == NULL)
q->head = (struct node *)malloc(sizeof(struct node));
//q->head->name = name;
strcpy(q->head->name, name);
q->head->next == NULL;
q->tail = q->head;
else
q->tail->next = (struct node *)malloc(sizeof(struct node));
//q->tail->next->name = name;
strcpy(q->tail->next->name, name);
q->tail->next->next = NULL;
q->tail = q->tail->next;
pthread_mutex_unlock(&q->lock);
/*--------------------------End of struct functions------------------*/
void* requesterThread(void *receivedStruct)
struct arguments *args_ptr;
args_ptr = (struct arguments*) receivedStruct;
FILE *fptr;
FILE *sptr;
int fileCounter = 0;
//Check for proper file paths
if ((fptr = fopen(args_ptr->input, "r")) == NULL)
fprintf(stderr, "Error! Bogus input file path.\n");
// Thread exits if file pointer returns NULL.
exit(1);
if ((sptr = fopen(args_ptr->reqLog, "w")) == NULL)
fprintf(stderr, "Error! Bogues output path.\n");
exit(1);
//Read from input file and push to shared queue
while (1)
while (fscanf(fptr,"%[^\n]%*c", args_ptr->line) != EOF)
sem_wait(&sem_empty);
sem_wait(&sem_mutex);
push(args_ptr->q, args_ptr->line);
sem_post(&sem_mutex);
sem_post(&sem_full);
//Update requesterLog and print logged hostnames
fprintf(sptr, "%s \n", args_ptr->line);
printf("Hostname Logged: %s\n", args_ptr->line);
/*LINE TO WRITE "Thread <id> serviced ## files" TO serviced.txt
fprintf(sptr, "Thread %d serviced %d files", pthread_self(), fileCounter);
*/
fileCounter++;
//Condition to send poisonPill to consumer
if(fileCounter == args_ptr->numInputFiles)
push(args_ptr->q, args_ptr->poisonPill);
break;
fclose(fptr);
fclose(sptr);
return 0;
void* resolverThread(void *receivedStruct)
struct arguments *args_ptr;
args_ptr = (struct arguments *) receivedStruct;
FILE *rptr;
if( (rptr = fopen(args_ptr->resLog, "w")) == NULL)
fprintf(stderr, "Error! Bogus output file path.\n");
exit(1);
while(1)
sem_wait(&sem_full);
sem_wait(&sem_mutex);
strcpy(args_ptr->result, args_ptr->q->head->name);
pop(args_ptr->q);
sem_post(&sem_mutex);
sem_post(&sem_empty);
if (args_ptr->result != args_ptr->poisonPill)
if (dnslookup(args_ptr->result, args_ptr->ipv6, INET6_ADDRSTRLEN) == -1)
fprintf(stderr, "Bogus hostname.\n");
fprintf(rptr, "%s, \n", args_ptr->result);
else
fprintf(rptr, "%s, %s\n", args_ptr->result, args_ptr->ipv6);
else break;
fclose(rptr);
return 0;
int main(/*int argc, char *argv[]*/)
int num_req_threads;
int num_res_threads;
int rc1;
int rc2;
sem_init(&sem_mutex, 1, 1);
sem_init(&sem_full, 1, 0);
sem_init(&sem_empty, 1, MAXSIZE);
//instance of shared queue struct
struct queue q;
init(&q);
//instance of arguments struct
struct arguments args;
args.q = &q;
args.input = "/home/user/Desktop/PA3/input/names1.txt";//argv[5];
args.reqLog = "/home/user/Desktop/PA3/serviced.txt";//argv[3];
args.resLog = "/home/user/Desktop/PA3/results.txt"; //argv[4];
args.poisonPill = "You shall not Pass\n";
args.numInputFiles = 1;;
num_req_threads = 1;
num_res_threads = 1;
if (num_req_threads > MAX_REQUESTER_THREADS)
printf("Cannot have more than 10 requester threads!\n"); exit(1);
if (num_res_threads > MAX_RESOLVER_THREADS)
printf("Cannot have more than 5 requester threads!\n"); exit(1);
//Thread Creation
pthread_t reqThreads[num_req_threads];
pthread_t resThreads[num_res_threads];
for (int j = 0; j < num_res_threads; j++)
rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
printf("Resolver thread %d created.\n", j);
for(int i = 0; i < num_req_threads; i++)
rc1 = pthread_create(&reqThreads[i], NULL, requesterThread, (void *)&args);
printf("Requester thread %d created.\n", i);
/*for (int j = 0; j < num_res_threads; j++)
rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
printf("Resolver thread %d created.\n", j);
*/
if(rc1 || rc2)
printf("Could not create threads.\n");
exit(-1);
for (int n = 0; n < num_res_threads; n++)
pthread_join(resThreads[n], 0);
for (int m = 0; m < num_req_threads; m++)
pthread_join(reqThreads[m], 0);
/*for (int n = 0; n < num_res_threads; n++)
pthread_join(resThreads[n], 0);*/
sem_destroy(&sem_mutex);
sem_destroy(&sem_full);
sem_destroy(&sem_empty);
pthread_mutex_destroy(&q.lock);
return 0;
输出看起来像
>> valgrind ./mult -v
==29127== Memcheck, a memory error detector
==29127== Copyright (C) 2002-2015, and GNU GPL'd, by Julian Seward et al.
==29127== Using Valgrind-3.11.0 and LibVEX; rerun with -h for copyright info
==29127== Command: ./mult -v
==29127==
Resolver thread 0 created.
Requester thread 0 created.
Hostname Logged: facebook.com
Hostname Logged: youtube.com
Hostname Logged: yahoo.com
Hostname Logged: live.com
Hostname Logged: wikipedia.org
Hostname Logged: msn.com
Hostname Logged: baidu.com
Hostname Logged: blogspot.com
Hostname Logged: microsoft.com
Hostname Logged: qq.com
Hostname Logged: taobao.com
Hostname Logged: bing.com
Hostname Logged: sina.com.cn
Hostname Logged: soso.com
Hostname Logged: 163.com
Hostname Logged: ask.com
Hostname Logged: adobe.com
Hostname Logged: twitter.com
Hostname Logged: mozilla.com
Hostname Logged: youku.com
Hostname Logged: sdjjdsaf.com
Hostname Logged: andysayler.com
Hostname Logged: wmfo.org
Error looking up Address: Name or service not known
Bogus hostname.
^C
>>GETS CAUGHT HERE
==29127==
==29127== Process terminating with default action of signal 2 (SIGINT)
==29127== at 0x4E4298D: pthread_join (pthread_join.c:90)
==29127== by 0x4019E4: main (in /home/user/Desktop/PA3/mult)
==29127==
==29127== HEAP SUMMARY:
==29127== in use at exit: 1,208 bytes in 4 blocks
==29127== total heap usage: 371 allocs, 367 frees, 1,776,506 bytes allocated
==29127==
==29127== LEAK SUMMARY:
==29127== definitely lost: 0 bytes in 0 blocks
==29127== indirectly lost: 0 bytes in 0 blocks
==29127== possibly lost: 544 bytes in 2 blocks
==29127== still reachable: 664 bytes in 2 blocks
==29127== suppressed: 0 bytes in 0 blocks
==29127== Rerun with --leak-check=full to see details of leaked memory
==29127==
==29127== For counts of detected and suppressed errors, rerun with: -v
==29127== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
【问题讨论】:
这能回答你的问题吗? understanding of pthread_cond_wait() and pthread_cond_signal() 再看这个,逻辑不通。为什么不让主线程简单地读取文件并将所有查询推送到“请求”队列,然后在另一个“响应”队列中等待结果呢?然后线程将从“请求”队列中弹出一个条目,执行查找并将结果推送到“响应”队列。您不需要到处使用信号量。此外,如果队列“已满”,请不要阻塞;推动者可以退后并重试。只有这样才需要条件变量来“等待某些东西如果为空则放入队列”,比轮询更有效。 另外我会考虑让主线程为“请求-响应”结构执行内存抓取,该结构将容纳请求和响应。然后可以简单地使用队列将请求-响应对象从生产者传递到消费者。 请提出不同的问题,而不是更改您现有的问题。这样做可能会使现有答案无效 【参考方案1】:我怀疑原因是竞争条件和/或未定义的行为。
push()
和 pop()
都会改变链表 - 但它们没有使用相同的互斥锁来保护访问,因此它们可能同时运行。
void pop(struct queue *q)
pthread_mutex_lock(&q->lock);
和
void push(struct queue *q, char *name)
pthread_mutex_lock(&q->lock2);
push()
在队列已满的情况下保持lock2
,然后使用q->lock
等待条件变量,直到队列未满。
while (q->size == MAXSIZE)
pthread_cond_wait(&q->cond, &q->lock);
调用pthread_cond_timedwait()
的前提条件是互斥体已经被锁定,但我不明白它为什么会在这里。
来自man page
pthread_cond_timedwait() 和 pthread_cond_wait() 函数应阻塞条件变量。它们应在调用线程锁定的互斥锁或未定义的行为结果的情况下调用。
来自pthread_cond_wait()
的任何错误都会被忽略。检查这一点可能是谨慎的。
顺便说一句,在push()
中,所有路径都调用malloc()
- 可能会阻塞。最好在锁定互斥体之前调用它。
【讨论】:
我只是在搞砸,但即使我使用同一个锁,我也会陷入这个无限循环以上是关于C中具有共享队列的线程安全生产者/消费者的主要内容,如果未能解决你的问题,请参考以下文章