C中的消息队列:实现2路通信
Posted
技术标签:
【中文标题】C中的消息队列:实现2路通信【英文标题】:message queue in C: implementing 2 way comm 【发布时间】:2014-05-04 21:23:59 【问题描述】:我是 C 语言的学生和初学者。我想在 C linux 中使用消息队列实现 2 路通信。我需要两个队列还是只需要一个队列来完成这项工作?
我也想知道我是否可以将数据(显示在代码中)发送到另一个进程,或者我需要将其声明为字符数组。
typedef struct msg1
int mlen;
char *data;
M1;
typedef struct msgbuf
long mtype;
M1 *m;
message_buf;
提前谢谢:)
【问题讨论】:
如果你想在进程之间发送消息,请阅读更多关于IPC的信息,并使用POSIX message queues之类的东西,因为你不能通过发送例如进程之间的指针(因为每个进程都有自己的私有内存映射)。 是的,双向通信需要两个队列。 您将需要多个消息队列。尝试使用单个消息队列在多个进程之间进行双向通信会变成一个复杂的情况 非常感谢@JoachimPileborg。 @Jayesh 我只是想知道它是否可能?是吗? 【参考方案1】:Also I would like to know can I send data(shown in code) to another process or i need to declare it as a character array
是的,您可以将数据发送到另一个进程
喜欢
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#define MAXSIZE 128
void die(char *s)
perror(s);
exit(1);
struct msgbuf
long mtype;
char mtext[MAXSIZE];
;
main()
int msqid;
int msgflg = IPC_CREAT | 0666;
key_t key;
struct msgbuf sbuf;
size_t buflen;
key = 1234;
if ((msqid = msgget(key, msgflg )) < 0) //Get the message queue ID for the given key
die("msgget");
//Message Type
sbuf.mtype = 1;
printf("Enter a message to add to message queue : ");
scanf("%[^\n]",sbuf.mtext);
getchar();
buflen = strlen(sbuf.mtext) + 1 ;
if (msgsnd(msqid, &sbuf, buflen, IPC_NOWAIT) < 0)
printf ("%d, %d, %s, %d\n", msqid, sbuf.mtype, sbuf.mtext, buflen);
die("msgsnd");
else
printf("Message Sent\n");
exit(0);
//IPC_msgq_rcv.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#define MAXSIZE 128
void die(char *s)
perror(s);
exit(1);
typedef struct msgbuf
long mtype;
char mtext[MAXSIZE];
;
main()
int msqid;
key_t key;
struct msgbuf rcvbuffer;
key = 1234;
if ((msqid = msgget(key, 0666)) < 0)
die("msgget()");
//Receive an answer of message type 1.
if (msgrcv(msqid, &rcvbuffer, MAXSIZE, 1, 0) < 0)
die("msgrcv");
printf("%s\n", rcvbuffer.mtext);
exit(0);
如果你了解消息队列,那么消息队列就是用于进程间通信的。
同样对于多个进程之间的双向通信,你需要多个消息队列
【讨论】:
仅给定两个进程之间的双向通信,至少需要多少个队列?? 需要两个消息队列 @user3433848,您可以使用 sysv MQ 进行双向通信,但它需要您对尚未达到的队列有一定程度的熟悉,即使在最好的情况下,这也是一种值得怀疑的做法,除非你真的是被环境逼迫的。两个队列更容易。【参考方案2】:这是我编写的一个消息队列,我可以将它包含在我的任何多线程程序中。在进程之间,您可以使用套接字或 mqueue.h,但我设计的这个轻量级消息队列库可以创造奇迹。
这里是“messaging.h”头文件。
// messaging.h
//
// Created by Justin Jack on 11/2/17.
// Copyright © 2017 Justin Jack. All rights reserved.
// This code may be freely distributed for personal and business
// use. Additional credits may be added, but the original author
// information musy stay part of this code.
//
#ifndef messaging_h
#define messaging_h
#include <stdio.h>
#include <pthread.h>
#include <memory.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#define MAX_JMESSAGES 2000
#define JMSG_INIT_QUEUE_SIZE 100
#define JMSG_EMPTY_QUEUE -1
//#define QUEUE_DEBUG
static pthread_mutex_t messagelock = PTHREAD_MUTEX_INITIALIZER;
typedef enum _jmessages
JMSG_QUIT = 1, // 1
JMSG_CHANGE, // 2
JMSG_FAILURE, // 3
JMSG_CONTINUE, // 4
JMSG_DRAW, // 5
JMSG_CUSTOM1, // 6
JMSG_CUSTOM2, // 7
JMSG_CUSTOM3, // 8
MSG_CUSTOM4, // 9
JMSG_CUSTOM5, // 10
JMSG_CUSTOM6, // 11
JMSG_CUSTOM7, // 12
JMSG_CUSTOM8, // 13
JMSG_CUSTOM9, // 14
JMSG_CUSTOM10 // 15
JMESSAGES;
/* Returned by popmessage() */
typedef enum jmsg_returns
JMSG_NOMESSAGE, // 0
JMSG_MOREMESSAGES, // 1
JMSG_LASTMESSAGE, // 2
JMSG_TIMEOUT // 3
JMSG_RETURN;
typedef enum jmsg_errors
JMSG_QUEUENOTFOUND = -1,
JMSG_OKAY, // 0
JMSG_QUEUEFULL, // 1
JMSG_OUTOFMEMORY // 2
JMSG_ERROR;
typedef struct jmessage
unsigned long long message;
unsigned long long value1;
unsigned long long value2;
union _mptr
void *voidptr;
char *charptr;
unsigned int *uintptr;
int *intptr;
double *doubleptr;
float *floatptr;
unsigned long long *lluptr;
long long *llptr;
short *shortptr;
unsigned short *ushortptr;
ptr;
int pointerlength;
int needtofreepointer;
pthread_t fromthread;
const char *fromthreadname;
JMESSAGE;
typedef struct jqueue
pthread_t owningthreadid;
char semname[20];
char threadname[100];
sem_t *messagesignal;
int queuesize;
JMESSAGE *jmessagelist;
int lastfulljmessage;
struct jqueue *next;
struct jqueue *previous;
JQUEUE;
static JQUEUE *jqueue = 0;
/* Creates a messagequeue for the current thread */
int createmessagequeue( char *description );
/* Destroys the message queue on this thread */
int destroymessagequeue();
/* Sends the message in JMESSAGE *jmsg to another thread */
int pushmessage( JMESSAGE *jmsg, pthread_t to_thread_id );
/* Checks for a message waiting for this queue. If found,
it populates the message in the JMESSAGE * parameter passed.
Returns:
--------
JMSG_QUEUENOTFOUND (No call to createmessagequeue()
JMSG_EMPTY_QUEUE (The message queue was empty)
JMSG_MOREMESSAGES (You should loop while you get this)
JMSG_LASTMESSAGE (The last message on the stack)
*/
int popmessage( JMESSAGE *jmsg );
/* A blocking version of popmessage() */
int waitmessage( JMESSAGE *jmsg );
/* Internal Functions, public use not needed */
static int lockit();
static int unlockit();
static int popmessagenosem( JMESSAGE *jmsg );
#endif /* messaging_h */
这里是“messaging.c”源文件:
//
// messaging.c
// DeedleCore
//
// Created by Justin Jack on 11/2/17.
// Copyright © 2017 Justin Jack. All rights reserved.
//
#include "messaging.h"
static int lockit()
int x= 0;
pthread_t callingthreadid = pthread_self();
x = pthread_mutex_lock(&messagelock);
#ifdef QUEUE_DEBUG
if (x != 0 )
printf("lockit(%lld) failed to lock mutex.\n", callingthreadid);
switch (x)
case EINVAL:
printf("\t\tEINVAL:The mutex was created with the protocol attribute having the value PTHREAD_PRIO_PROTECT and the calling thread's priority is higher than the mutex's current priority ceiling.\n");
break;
case EBUSY:
printf("\t\tEBUSY:The mutex could not be acquired because it was already locked.\n");
break;
case EAGAIN:
printf("\t\tEAGAIN:The mutex could not be acquired because the maximum number of recursive locks for mutex has been exceeded.\n");
break;
case EDEADLK:
printf("\t\tEDEADLK:The current thread already owns the mutex.\n");
break;
case EPERM:
printf("\t\tEPERM:The current thread does not own the mutex.\n");
break;
default:
break;
#endif
return x;
static int unlockit()
int x = 0;
pthread_t callingthreadid = pthread_self();
x = pthread_mutex_unlock(&messagelock);
#ifdef QUEUE_DEBUG
if (x)
printf("\tunlockit(&lld) failed to unlock mutex.\n", callingthreadid);
#endif
return x;
static char *getthreadname( pthread_t idnum )
static char retval[] = "";
JQUEUE *jq = jqueue;
if (!jq) return retval;
lockit();
do
if (jq->owningthreadid == idnum)
unlockit();
return jq->threadname;
jq = jq->next;
while (jq);
unlockit();
return retval;
int createmessagequeue( char *description )
int retval = JMSG_QUEUENOTFOUND;
JQUEUE *jq = jqueue;
pthread_t callingthreadid = pthread_self();
pthread_mutexattr_t ptt;
if (!jq)
#ifdef QUEUE_DEBUG
printf("messaging.c > Initializing Messaging System ********************** \n");
#endif
pthread_mutexattr_init(&ptt);
#ifdef PTHREAD_MUTEX_RECURSIVE_NP
pthread_mutexattr_settype(&ptt, PTHREAD_MUTEX_RECURSIVE_NP);
#else
pthread_mutexattr_settype(&ptt, PTHREAD_MUTEX_RECURSIVE);
#endif
pthread_mutex_init(&messagelock, &ptt);
lockit();
jqueue = calloc(1, sizeof(JQUEUE));
if (jqueue)
jqueue->jmessagelist = calloc(JMSG_INIT_QUEUE_SIZE, sizeof(JMESSAGE));
if (jqueue->jmessagelist)
jqueue->queuesize = JMSG_INIT_QUEUE_SIZE;
jqueue->owningthreadid = callingthreadid;
jqueue->lastfulljmessage = JMSG_EMPTY_QUEUE;
if (description)
if (strlen(description) < 100)
sprintf(jqueue->threadname, "%s", description);
else
sprintf(jqueue->threadname, "%.*s", 99, description);
sprintf(jqueue->semname, "SEM%d", (int)callingthreadid);
jqueue->messagesignal = sem_open(jqueue->semname, O_CREAT, 0644, 0);
#ifdef QUEUE_DEBUG
printf("messaging.c > Message queue set up for \"%s\" (%lld).\n ", jqueue->threadname, callingthreadid);
#endif
retval = JMSG_OKAY;
else
free(jqueue);
jqueue = 0;
retval = JMSG_OUTOFMEMORY;
else
retval = JMSG_OUTOFMEMORY;
else
lockit();
do
if (jq->owningthreadid == callingthreadid)
retval = JMSG_OKAY;
break;
if (jq->next == 0)
jq->next = calloc(1, sizeof(JQUEUE));
if (jq->next)
jq->next->jmessagelist = calloc(JMSG_INIT_QUEUE_SIZE, sizeof(JMESSAGE));
if (jq->next->jmessagelist)
jq->next->previous = jq;
jq->next->queuesize = JMSG_INIT_QUEUE_SIZE;
jq->next->owningthreadid = callingthreadid;
jq->next->lastfulljmessage = JMSG_EMPTY_QUEUE;
sprintf(jq->next->semname, "SEM%d", (int)callingthreadid);
if (description)
if (strlen(description) < 100)
sprintf(jq->next->threadname, "%s", description);
else
sprintf(jq->next->threadname, "%.*s", 99, description);
#ifdef QUEUE_DEBUG
printf("messaging.c > Message queue set up for \"%s\" (%lld).\n ", jq->next->threadname, callingthreadid);
#endif
jq->next->messagesignal = sem_open(jq->next->semname, O_CREAT, S_IRWXU, 0);
retval = JMSG_OKAY;
else
free(jq);
jq = 0;
retval = JMSG_OUTOFMEMORY;
break;
jq = jq->next;
while (jq);
unlockit();
return retval;
int destroymessagequeue()
int i = 0, retval = JMSG_QUEUENOTFOUND;
JQUEUE *jq = jqueue;
JMESSAGE *jm = 0;
pthread_t callingthreadid = pthread_self();
lockit();
if (jq)
/*
* Search for messages waiting to be delivered that are from THIS
* QUEUE that we're destroying. We need to replace the pointer that
* is pointing to the name of this queue with a generic (const char *)
* saying "(dead queue)" so we don't get an invalid memory read when
* it's delivered.
*
*/
do
if (jq->owningthreadid != callingthreadid)
for (i = 0; i <= jq->lastfulljmessage; i++)
jm = &jq->jmessagelist[i];
if (jm->fromthread == callingthreadid)
jm->fromthreadname = "(dead queue)";
jq = jq->next;
while (jq);
/* Reset jq, find this queue and destroy it */
jq = jqueue;
do
if (jq->owningthreadid == callingthreadid)
/* Found the queue */
retval = JMSG_OKAY;
#ifdef QUEUE_DEBUG
printf("messaging.c > Destroying message queue for \"%s\" (%lld)\n", jq->threadname, jq->owningthreadid);
#endif
/*
Free any internal pointers that were supposed
to be freed after retrieval.
*/
for (i = 0; i <= jq->lastfulljmessage; i++)
sem_wait(jq->messagesignal);
if (jq->jmessagelist[i].needtofreepointer)
free(jq->jmessagelist[i].ptr.charptr);
/* Free the queue messages */
free(jq->jmessagelist);
sem_unlink(jq->semname);
/* Unlink the queue */
if (jq->previous) /* If this isn't the first element */
jq->previous->next = jq->next;
if (jq->next)
jq->next->previous = jq->previous;
else
if (jq->next)
jq->next->previous = 0;
free(jq);
break;
jq = jq->next;
while (jq);
unlockit();
return retval;
int pushmessage( JMESSAGE *jmsg, pthread_t to_thread_id )
int queuespotsleft = 0, retval = JMSG_QUEUENOTFOUND;
JQUEUE *jq = jqueue;
JMESSAGE *jm = 0, *newjmqueue;
pthread_t callingthreadid = pthread_self();
if (!jq)
return JMSG_QUEUENOTFOUND;
lockit();
do
if (jq->owningthreadid == to_thread_id)
if (jq->lastfulljmessage == JMSG_EMPTY_QUEUE)
jq->lastfulljmessage = 0;
jm = &jq->jmessagelist[0];
else
if (jq->lastfulljmessage + 1 > MAX_JMESSAGES)
/* We have too many messages backed up */
unlockit();
return JMSG_QUEUEFULL;
if (jq->lastfulljmessage + 1 > (jq->queuesize - 1))
/*
* We're getting backed up, we need to allocate more
* space. This is slowly moving the allocated message
* queue memory for this thread towards the MAX_JMESSAGES
*
*/
queuespotsleft = MAX_JMESSAGES - (jq->lastfulljmessage + 1);
if (queuespotsleft > 50) queuespotsleft = 50;
if (queuespotsleft > 0)
newjmqueue = realloc(jq->jmessagelist, ((jq->lastfulljmessage + 1) * sizeof(JMESSAGE)));
if (!newjmqueue)
unlockit();
return JMSG_OUTOFMEMORY;
jq->jmessagelist = newjmqueue;
jq->lastfulljmessage++;
jm = &jq->jmessagelist[jq->lastfulljmessage];
else
retval = JMSG_QUEUEFULL;
else
/* It's withing the bounds of the allocated message space */
jm = &jq->jmessagelist[++jq->lastfulljmessage];
if (jm)
retval = JMSG_OKAY;
memcpy(jm, jmsg, sizeof(JMESSAGE));
jm->fromthread = callingthreadid;
jm->fromthreadname = getthreadname(callingthreadid);
/* Go ahead and increment the semaphore count in case
* they're calling waitmessage()
*
*/
sem_post(jq->messagesignal);
break;
jq = jq->next;
while (jq);
unlockit();
return retval;
int popmessage( JMESSAGE *jmsg )
int retval = JMSG_QUEUENOTFOUND;
JQUEUE *jq = jqueue;
pthread_t callingthreadid = pthread_self();
if (!jq)
return JMSG_QUEUENOTFOUND;
memset(jmsg, 0, sizeof(JMESSAGE));
lockit();
do
if (jq->owningthreadid == callingthreadid)
if (jq->lastfulljmessage > JMSG_EMPTY_QUEUE)
memcpy(jmsg, &jq->jmessagelist[jq->lastfulljmessage], sizeof(JMESSAGE));
memset(&jq->jmessagelist[jq->lastfulljmessage], 0, sizeof(JMESSAGE));
jq->lastfulljmessage--;
retval = ((jq->lastfulljmessage == JMSG_EMPTY_QUEUE) ? JMSG_LASTMESSAGE : JMSG_MOREMESSAGES);
/* Decrease the semaphore count because they're NOT calling waitmessage() but may later... */
sem_wait(jq->messagesignal);
else
retval = JMSG_NOMESSAGE;
break;
jq = jq->next;
while (jq);
unlockit();
return retval;
/*
* This function is only called by waitmessage() It doesn't decrease the
* semaphore count, because we've already waited for it to be signaled
* in waitmessage(). This just pulls the message off of the stack.
*
*/
static int popmessagenosem( JMESSAGE *jmsg )
int retval = JMSG_QUEUENOTFOUND;
JQUEUE *jq = jqueue;
pthread_t callingthreadid = pthread_self();
if (!jq)
return JMSG_QUEUENOTFOUND;
lockit();
do
if (jq->owningthreadid == callingthreadid)
if (jq->lastfulljmessage > JMSG_EMPTY_QUEUE)
memmove(jmsg, &jq->jmessagelist[jq->lastfulljmessage], sizeof(JMESSAGE));
memset(&jq->jmessagelist[jq->lastfulljmessage], 0, sizeof(JMESSAGE));
jq->lastfulljmessage--;
retval = ((jq->lastfulljmessage == JMSG_EMPTY_QUEUE) ? JMSG_NOMESSAGE :
((jq->lastfulljmessage == 0) ? JMSG_LASTMESSAGE : JMSG_MOREMESSAGES));
else
retval = JMSG_NOMESSAGE;
break;
jq = jq->next;
while (jq);
unlockit();
return retval;
int waitmessage( JMESSAGE *jmsg )
JQUEUE *jq = jqueue;
sem_t *waitingon = 0;
pthread_t callingthreadid = pthread_self();
if (!jq)
return JMSG_QUEUENOTFOUND;
lockit();
do
if (jq->owningthreadid == callingthreadid)
//printf("Waiting on semaphore %s to be signalled...\n", jq->semname);
waitingon = jq->messagesignal;
break;
jq = jq->next;
while (jq);
unlockit();
if (!waitingon) return JMSG_QUEUENOTFOUND;
//printf("waiting on semaphore!\n");
sem_wait(waitingon);
popmessagenosem(jmsg);
//printf("semaphore signalled! continuing...\n");
return JMSG_OKAY;
享受吧!我喜欢它。我在所有线程中设置了消息循环来执行不同的操作,并且可以轻松快速地传达数据!
这是一个使用此代码的工作示例: ** 一定要链接到 -lpthread
#include <signal.h>
#include "messaging.h"
#define THREAD_STARTED JMSG_CUSTOM1
#define THREAD_FAILED JMSG_CUSTOM2
#define THREAD_EXITING JMSG_CUSTOM3
#define THREAD_ACKNOWLEDGE JMSG_CUSTOM4
void sighand( int signum )
JMESSAGE msg;
printf("\nSignal Received! Exiting...\n");
msg.message = JMSG_QUIT;
pushmessage(&msg, pthread_self()); /* Put this JMSG_QUIT message on
* the main thread's message queue
* to let the main code know to quit.
*/
return;
void *mythread( void *_mainthreadid )
pthread_t mainthreadid = (pthread_t)_mainthreadid;
JMESSAGE msg;
signal(SIGINT, &sighand); /* CTRL-C */
if (!createmessagequeue("mythread"))
printf("main.c > mythread() > createmessagequeue(): Failed.\n");
return 0;
/*
* Send a message to the main thread so it can do something when it
* knows we're ready.
*/
msg.message = THREAD_ACKNOWLEDGE;
pushmessage(&msg, mainthreadid);
printf("main.c > mythread(): Launched successfully, using blocking message loop!\n");
do
waitmessage(&msg); /*
* Wait indefinitely. You can, however, use a
* signal to send a message to this queue to get
* it to move along, or signal it from another thread
* to get it to move along.
*
*/
switch (msg.message)
case THREAD_ACKNOWLEDGE:
printf("main.c > mythread(): THREAD_ACKNOWLEDGE received from thread \"%s\" (0x%x).\n", msg.fromthreadname, msg.fromthread);
fflush(stdout);
break;
default:
break;
while (msg.message != JMSG_QUIT);
printf("main.c > mythread(): Got JMSG_QUIT.\n");
msg.message = THREAD_EXITING;
pushmessage(&msg, mainthreadid);
printf("main.c > mythread(): Calling destroymessagequeue()!\n");
destroymessagequeue();
printf("main.c > mythread(): Exiting.\n");
return 0;
int main( void )
JMESSAGE msg;
pthread_t mythreadid;
int ret;
ret = createmessagequeue("Main Thread");
if (!ret)
printf("main.c > createmessagequeue(): Failed with %d.\n", ret);
return 0;
pthread_create(&mythreadid, 0, &mythread, (void *)pthread_self());
printf("main.c > main(): Press [CTRL-C] to terminate program.\n");
do
/* NON Blocking message queue */
if (popmessage(&msg))
switch (msg.message)
case JMSG_QUIT:
/* Forward the message on to any other queues */
if (pushmessage(&msg, mythreadid))
printf("main.c > main(): Received JMSG_QUIT. Forwarded message to mythreadid\n");
fflush(stdout);
pthread_join(mythreadid, 0);
break;
case THREAD_ACKNOWLEDGE:
printf("main.c > main(): Received a THREAD_ACKNOWLEDGE from thread \"%s\" (0x%x)\n", msg.fromthreadname, msg.fromthread);
/* Bounce message back for the heck of it */
pushmessage(&msg, mythreadid);
fflush(stdout);
break;
case THREAD_EXITING:
printf("main.c > main(): Received a THREAD_EXITING from thread \"%s\" (0x%x)\n", msg.fromthreadname, msg.fromthread);
fflush(stdout);
break;
default:
break;
else /* No messages do some important stuff */
usleep(20000); /* Take a breather */
while (msg.message != JMSG_QUIT);
printf("main.c > main(): Calling destroymessagequeue()!\n");
destroymessagequeue();
printf("main.c > main(): Exiting program!\n");
fflush(stdout);
return 0;
【讨论】:
以上是关于C中的消息队列:实现2路通信的主要内容,如果未能解决你的问题,请参考以下文章