21消息队列
Posted gd-luojialin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了21消息队列相关的知识,希望对你有一定的参考价值。
消息与消息队列
IPC (Inter process communication)
广义:所有可以用于进程间通信的对象和方法
狭义:特指消息队列,信号量,共享内存
消息队列 应用于进程间少量数据的顺序共享
信号量 应用于进程间互斥
共享内存 应用与进程间大量数据的随机共享访问
命令行查询IPC对象
ipcs [...]
-q 查询 消息队列
-s 查询 信号量
-m 查询 共享内存
-a 查询所有
无 查询所有
命令行删除 IPC对象
ipcrm [...]
-q msqid 删除消息队列
-m shmid 删除共享内存
-s semid 删除信号量
创建消息队列的步骤:
消息队列使用(函数):
1: ftok 使用某个文件做关键字创建 key
2: msgget 使用key创建消息队列 msqid
3: msgsnd 往消息队列中写入消息
4: msgrcv 从消息队列中读取消息
5: msgctl 删除消息队列
创建 IPC
<sys/types>
<sys/ipc.h>
key_t ftok(const char *pathname, int id)
注意:由文件名 和 id 唯一决定一个key,文件名和 id都一样的话,获得的key必然一样,否则,key必然不一样,通过创建一个key,实现 两个进程通信的桥梁关键字。
该文件也要必须存在,可空白。
创建消息队列
<sys/types.h>
<sys/ipc.h>
<sys/msg.h>
int msgget(key_t key, int msgflg)
返回 msqid
创建消息队列
IPC_PRIVATE 创建key = 0的消息队列,可以同时存在多个
IPC_EXCL | IPC_CREAT 确保这个消息队列是新创建的,不与已有的消息队列冲突
往消息队列发送消息
int msgsnd(int msqid, const void *msg, size_t szLen, int msgflg);
参数解析:
msg 指向 msgbuf 结构(自定义)
struct msgbuf {
long type; //大于0的数
char text[N]; //文本大小由 szLen 指定
}
msgflag:
0 队列满则阻塞
IPC_NOWAIT 队列满则返回错误 (EAGAIN)
从消息队列中读取消息
size_t msgrcv(int msqid, void *msg, size_t sz, long msgtype, int msgflg)
参数:
msg 指向 msg_info 数据结构, 同msgsnd
msgflg 取值 ( MSG_XXX 需要定义宏: _GNU_SOURCE)
0 :默认值,没有消息则阻塞
IPC_NOWAIT: 没有这个类型的消息,立即返回 (ENOMSG)
MSG_EXCEPT: 返回队列中第一个不是某类型的消息
注意:使用MSG_EXCEPT必须加上宏:#define _GNU_SOURCE,并且该宏必须在<sys/msg.h>前面定义。
MSG_NOERROR: 消息内容大于请求长度,丢弃多余部分
往消息队列发送消息
msgsnd 函数调用成功会修改 msqid_ds
msg_lspid 设置为对应进程PID
msg_qnum 加 1
msg_stime 设置为当前时间
从队列中读取消息
msgrcv 调用成功后,修改 msqid_ds 结构体
msg_lrpid: 设置为进程PID
msg_qnum: 减1
msg_rtime: 设置为当前是时间
例子:
typedef struct tagMsg
{
long type;
char szBuf[1024];
}MSG_S;
void testMsgRS()
{
int ch;
fprintf(stderr,r“select r/w:”);
scanf(“%c”,&ch);
if(ch!=’w’&&ch!=’r’)
{
return ;
}
//创建key
key_t key=ftok("123",321);
if(key==-1)
{
perror("fail ftok");
return ;
}
printf("key=%d ",key);
//创建消息队列
int msqid=msgget(key,IPC_CREAT|0666);
if(msqid==-1)
{
perror("fail msgget!");
return ;
}
printf("msqid=%d ",msqid);
MSG_S msg;
//read message
if(rs==‘r‘)
{
while(1)
{
memset(&msg,0,sizeof(msg.szBuf));
fprintf(stderr,"type:");
scanf("%ld",&(msg.type));
//read(0,&(msg.type),sizeof(long));
int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type, IPC_NOWAIT);
if(nRet<0)
{
perror("fail msgsnd");
break;
}
printf("receive:%s ",msg.szBuf);
}
}
//send message
else if(rs==‘s‘)
{
fprintf(stderr,"example:1 ABCD! ");
while(1)
{
memset(&msg,0,sizeof(msg.szBuf));
fprintf(stderr,"send:");
scanf("%ld%s",&(msg.type),msg.szBuf);
//read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));
int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);
if(nRet<0)
{
perror("fail msgsnd");
break;
}
}
}
}
消息队列设置
int msgctl(int msqid, int cmd, struct msqid_ds *buf)
参数:
cmd 参数
IPC_STAT 读取内核中 msqid_ds 数据到 buf
IPC_SET 设置 buf 中数据到 msqid_ds
IPC_RMID 移除消息队列,读写消息队列进程返回 EIDRM
IPC_INFO 返回系统级别的消息队列限制,保存到buf,这里 buf 指向 msginfo 结构数据
MSG_INFO 返回 msginfo 消息,同时获取资源消耗情况:
msgpool 系统中存在的消息队列数
msgmap 系统中所有消息队列的消息数
msgtql 系统中所有消息队列占用的字节数
MSG_STAT 返回 msginfo 消息
msqid 参数使用内核的消息队列信息
参数三结构体:
struct msqid_ds {
struct ipc_perm msg_perm; //消息队列权限等信息
time_t msg_stime; //msgsnd 最后调用时间
time_t msg_rtime; //msgrcv 最后调用时间
time_t msg_ctime; //消息队列最后改变的时间
unsigned long _msg_cbytes; //消息队列当前消息总长度
msgqnum_t msg_qnum; //消息队列中消息个数
msglen_t msg_qbytes; //队列能存消息的最大长度
pid_t msg_lspid; //最后调用 msgsnd 的进程号
pid_t msg_lrpid; //最后调用 msgrcv 的进程
}
struct ipc_perm结构体
struct ipc_perm{
key_t __key; //msgget 的key参数
uid_t uid //消息队列所有者 euid
gid_t gid; //消息队列所有者 egid
uid_t cuid; //消息队列创建者 euid
gid_t cgid; //消息队列创建者 egid
unsigned short mode; //访问权限
unsigned short __seq; //序列号
}
MSG_INFO 返回 msginfo 消息,结构体:
struct msginfo {
int msgpool; //系统中存在的消息队列数
int msgmap; //系统中所有消息队列的消息数
int msgmax; //单条消息中最大长度
int msgmnb; //消息队列最大可写入字节数
int msgmni; //消息队列可容纳消息条数
int msgssz; //消息段大小,未使用
int msgtql; //系统中所有消息队列占用的字节数
}
内核中维护的消息队列数据格式(链表)
struct msg {
struct msg *msg_next; //下一个消息节点
long msg_type; //消息类型
ushort msg_ts; //消息长度
short msg_spot; //消息内容(指针)
}
例子:
#define _GNU_SOURCE
#include <stdio.h>
#include <sys/msg.h>
#include <string.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <unistd.h>
typedef struct tagMsg
{
long type;
char szBuf[1024];
}MSG_S;
void testMsgRS(char rs)
{
key_t key=ftok("123",321);
if(key==-1)
{
perror("fail ftok");
return ;
}
printf("key=%d ",key);
int msqid=msgget(key,IPC_CREAT|0666);
if(msqid==-1)
{
perror("fail msgget!");
return ;
}
printf("msqid=%d ",msqid);
MSG_S msg;
//read message
if(rs==‘r‘)
{
while(1)
{
memset(&msg,0,sizeof(msg.szBuf));
fprintf(stderr,"type:");
scanf("%ld",&(msg.type));
//read(0,&(msg.type),sizeof(long));
int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type, IPC_NOWAIT);
if(nRet<0)
{
perror("fail msgsnd");
break;
}
printf("receive:%s ",msg.szBuf);
}
}
//send message
else if(rs==‘s‘)
{
fprintf(stderr,"example:1 ABCD! ");
while(1)
{
memset(&msg,0,sizeof(msg.szBuf));
fprintf(stderr,"send:");
scanf("%ld%s",&(msg.type),msg.szBuf);
//read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));
int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);
if(nRet<0)
{
perror("fail msgsnd");
break;
}
}
}
}
//列举msqid_ds的信息
void printfInfo(struct msqid_ds *pstMsg)
{
printf("--------msg_perm------ ");
printf("msgget key:%#o ",pstMsg->msg_perm.__key);
printf("all user euid:%d ",pstMsg->msg_perm.uid);
printf("all user egid:%d ",pstMsg->msg_perm.gid);
printf("creater euid:%d ",pstMsg->msg_perm.cuid);
printf("creater egid:%d ",pstMsg->msg_perm.cgid);
printf("R+W mode:%#o ",pstMsg->msg_perm.mode);
printf("queue numer:%d ",pstMsg->msg_perm.__seq);
printf(" ");
printf("msgsnd time:%d ",(int)pstMsg->msg_stime);
printf("msgrcv time:%d ",(int)pstMsg->msg_rtime);
printf("change time:%d ",(int)pstMsg->msg_ctime);
printf("message len:%ld ",pstMsg->msg_cbytes);
printf("message number:%d ",(int)pstMsg->msg_qnum);
printf("queue max len:%d ",(int)pstMsg->msg_qbytes);
printf("msgsnd pid:%d ",(int)pstMsg->msg_lspid);
printf("msgrcv pid:%d ",(int)pstMsg->msg_lrpid);
printf(" ");
}
//提示
void ListSel()
{
printf(" stat:output message queue infomation ");
printf(" set:set message queue mode ");
printf(" exit:delete message queue ");
printf(" ");
}
//消息队列设置
void testMsgctl()
{
key_t key=ftok("123",321);
if(key==-1)
{
perror("fail ftok");
return ;
}
printf("key=%d ",key);
int msqid=msgget(key,IPC_CREAT|0666);
if(msqid==-1)
{
perror("fail msgget!");
return ;
}
printf("msqid=%d ",msqid);
ListSel();
struct msqid_ds stMsg;
char szCmd[128];
//对消息队列进行操作。显示,修改
while(1)
{
fprintf(stderr,"-->");
scanf("%s",szCmd);
//显示
if(!strcmp(szCmd,"stat"))
{
msgctl(msqid, IPC_STAT, &stMsg);
printfInfo(&stMsg);
}
//修改权限
else if(!strcmp(szCmd,"set"))
{
msgctl(msqid, IPC_STAT, &stMsg);
int mode;
printf("now mode:%d ",stMsg.msg_perm.mode);
fprintf(stderr,"input new mode:");
scanf("%o",&mode);
if(mode<0||mode>0777)
{
fprintf(stderr,"mode is invaid ");
continue;
}
//修改操作
stMsg.msg_perm.mode=mode;
//修改后更新到消息队列
int nRet=msgctl(msqid, IPC_SET, &stMsg);
if(nRet)
{
perror("failed IPC_SET");
continue;
}
else
{
printf("set mode sucess ");
}
}
//退出
else if(!strcmp(szCmd,"exit"))
{
break;
}
}
//删除消息队列
fprintf(stderr,"sure to delete this message[y/n]:");
scanf("%s",szCmd);
if(!strcmp(szCmd,"y"))
{
//删除操作
int nRet=msgctl(msqid, IPC_RMID, &stMsg);
if(nRet)
{
perror("failed IPC_RMID");
}
else
{
printf("delete message queue[%d]sucess! ",msqid);
}
}
}
int main(int argc,char** argv)
{
/*//默认参数传入
if((argc!=2)||(strcmp(argv[1],"s")&&strcmp(argv[1],"r")))
{
printf("select [s/r/c]:%s ",argv[1]);
printf(" s:send/receive message ");
printf(" r:receive message ");
printf(" c:control message queue ");
return 0;
}
if(argv[1][0]==‘s‘||argv[1][0]==‘r‘)
{
testMsgRS(argv[1][0]);
}
else
{
testMsgctl();
}*/
printf(" s:send/receive message ");
printf(" r:receive message ");
printf(" c:control message queue ");
printf("select your choice [s/r/c]:");
char ch;
scanf("%c",&ch);
//读、写队列消息
if(ch==‘s‘||ch==‘r‘)
{
testMsgRS(ch);
}
//修改信息队列权限
else
{
testMsgctl();
}
return 0;
}
以上是关于21消息队列的主要内容,如果未能解决你的问题,请参考以下文章
检查一条消息是不是在 POSIX 消息队列中而没有从队列中删除它?
(21)Blender源码分析之鼠标按下消息添加到队列的过程