消息队列
Posted fellow_jing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列相关的知识,希望对你有一定的参考价值。
消息队列是比较常用的进程间通信方式。在公司的代码中应用也比较广泛。
消息队列一般用在需要异步执行,并且消息内容比较短的情况。一般用来发控制消息,和反馈的消息。
比如说,一个进程需要另外一个进程做某些操作,并且希望得到操作完成的反馈信息。
首先将消息队列的函数进行封装,并编译成so.
基本的结构体和封装函数如msg.h
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <errno.h>
typedef enum{
OPEN,
CLOSE,
PLAY,
PAUSE,
PREV,
NEXT,
}CtrlCmd;//控制命令
typedef enum{
OPEN_DONE,
CLOSE_DONE,
PLAY_DONE,
PAUSE_DONE,
PREV_DONE,
NEXT_DONE,
}CtrlFeedback;//反馈结果
typedef struct{
union {
CtrlCmd _ctlCmd;
CtrlFeedback _ctlFeedback;
}u;
int param;
}CtrlInfo;
typedef enum {
CTRL_CMD = 1,//控制消息
CTRL_FEEDBACK = 2,//反馈信息
}MsgType;
typedef struct{
long int _msgType;
CtrlInfo _ctlInfo;
}FellowMsg;
int fellow_create_msg_queue();
int fellow_remove_msg_queue(int msg_q_id);
int fellow_send_msg(int msg_q_id, FellowMsg *pFellowMsg);
int fellow_rcv_msg(int msg_q_id, FellowMsg *pFellowMsg);
相关封装函数在msg.c
#include "msg.h"
int fellow_create_msg_queue(const char *path, int proj_id)
{
key_t key;
int msg_q_id = -1;
if (-1 == (key = ftok(path, proj_id)))
{
printf("ftok fail, errno:%d\\n", errno);
return -1;
}
printf("key:0x%x\\n", (int)key);
if (-1 == (msg_q_id = msgget(key, IPC_CREAT | 0666)))
{
printf("msgget fail, errno:%d", errno);
return -1;
}
return msg_q_id;
}
int fellow_remove_msg_queue(int msg_q_id)
{
if (-1 == msgctl(msg_q_id, IPC_RMID, NULL))
{
printf("msgctl fail, errno:%d", errno);
return -1;
}
return 0;
}
int fellow_send_msg(int msg_q_id, FellowMsg *pFellowMsg)
{
if (NULL == pFellowMsg)
{
return -1;
}
size_t msgLen = sizeof(FellowMsg) - sizeof(long int);
if (-1 == msgsnd(msg_q_id, (void *)pFellowMsg, msgLen, 0))
{
printf("msgsnd fail, errno:%d", errno);
return -1;
}
return 0;
}
int fellow_rcv_msg(int msg_q_id, FellowMsg *pFellowMsg)
{
if (NULL == pFellowMsg)
{
return -1;
}
size_t msgLen = sizeof(FellowMsg) - sizeof(long int);
if (-1 == msgrcv(msg_q_id, (void *)pFellowMsg, msgLen, 0, 0))
{
printf("msgrcv fail, errno:%d", errno);
return -1;
}
return 0;
}
将msg.c编译成libmsg.so,gcc --share -I. msg.c -o libmsg.so
发送控制消息,并监听反馈消息的进程msgsnd.c. (gcc msgsnd.c -o msgsnd -L. -lmsg -lpthread)
#include <sys/prctl.h>
#include <string.h>
#include <stdio.h>
#include "msg.h"
#define MSG_CREAT_PATH "/mnt/hgfs/share/test/list"
#define MSG_RCV_ID 4//和msgrcv.c正好相反
#define MSG_SND_ID 3
void *fellow_listenning_msg(void *arg)//监听反馈信息
{
if(0 != prctl(PR_SET_NAME, (unsigned long)"fellow_process_msg"))
{
printf("prctl fail, errno:%d", errno);
}
int msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_RCV_ID);
FellowMsg _fellowMsg;
while (1)
{
memset(&_fellowMsg, 0, sizeof(FellowMsg));
fellow_rcv_msg(msg_q_id, &_fellowMsg);
if (CTRL_FEEDBACK ==_fellowMsg._msgType)
{
switch (_fellowMsg._ctlInfo.u._ctlFeedback)
{
case OPEN_DONE:
printf("rcv OPEN_DONE:%d\\n", _fellowMsg._ctlInfo.param);
break;
case CLOSE:
printf("rcv CLOSE_DONE:%d\\n", _fellowMsg._ctlInfo.param);
break;
case PLAY:
printf("rcv PLAY_DONE:%d\\n", _fellowMsg._ctlInfo.param);
break;
default:
break;
}
}
}
}
void main(void)
{
pthread_t thread_id;
int snd_msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_SND_ID);
printf("msgid:%d\\n",snd_msg_q_id);
FellowMsg _fellowMsg;
_fellowMsg._msgType = CTRL_CMD;
_fellowMsg._ctlInfo.u._ctlCmd = OPEN;
_fellowMsg._ctlInfo.param = 1;
fellow_send_msg(snd_msg_q_id, &_fellowMsg);//发送OPEN命令
pthread_create(&thread_id, NULL, fellow_listenning_msg, NULL);
while (1)
{
}
}
接收控制消息,做完相关处理后,返回反馈消息。msgrcv.c(gcc msgsnd.c -o msgsnd -L. -lmsg -lpthread)
#include <sys/prctl.h>
#include <string.h>
#include <stdio.h>
#include "msg.h"
#define MSG_CREAT_PATH "/mnt/hgfs/share/test/list"
#define MSG_RCV_ID 3
#define MSG_SND_ID 4
void *fellow_process_msg(void *arg)
{
printf("thread start\\n");
int msg_q_id = *(int *)arg;
int snd_msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_SND_ID);
if(0 != prctl(PR_SET_NAME, (unsigned long)"fellow_process_msg"))
{
printf("prctl fail, errno:%d", errno);
}
FellowMsg _fellowRcvMsg;
FellowMsg _fellowSndMsg;
while (1)
{
memset(&_fellowRcvMsg, 0, sizeof(FellowMsg));
if( -1 == fellow_rcv_msg(msg_q_id, &_fellowRcvMsg))
break;
if (CTRL_CMD ==_fellowRcvMsg._msgType)
{
switch (_fellowRcvMsg._ctlInfo.u._ctlCmd)
{
case OPEN:
printf("rcv OPEN:%d\\n", _fellowRcvMsg._ctlInfo.param);
/*do something when rcv open*/
_fellowSndMsg._msgType = CTRL_FEEDBACK;
_fellowSndMsg._ctlInfo.u._ctlFeedback = OPEN_DONE;
_fellowSndMsg._ctlInfo.param = 0;
fellow_send_msg(snd_msg_q_id, &_fellowSndMsg);
break;
case CLOSE:
printf("rcv CLOSE:%d\\n", _fellowRcvMsg._ctlInfo.param);
break;
case PLAY:
printf("rcv PLAY:%d\\n", _fellowRcvMsg._ctlInfo.param);
break;
default:
break;
}
}
}
}
void main(void)
{
pthread_t thread_id;
int msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_RCV_ID);
printf("msgid:%d\\n", msg_q_id);
pthread_create(&thread_id, NULL, fellow_process_msg, (void *)&msg_q_id);
while (1)
{
/*do some thing in main thread*/
}
}
首先运行msgrcv进程,监听控制消息,然后运行msgsnd进程,发送控制消息,然后接收msgrcv的反馈。
运行时如果不将libmsg.so的路径加到LD_LIBRARY_PATH,执行时会报错,找不到libmsg.so.(export LD_LIBRARY_PATH=/libmsg.so路径:$LD_LIBRARY_PATH)
结果如下:
进程msgrcv,
进程msgsnd,
以上是关于消息队列的主要内容,如果未能解决你的问题,请参考以下文章