21消息队列

Posted gd-luojialin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了21消息队列相关的知识,希望对你有一定的参考价值。

消息与消息队列

IPC  (Inter process communication)

广义:所有可以用于进程间通信的对象和方法

狭义:特指消息队列,信号量,共享内存

 

消息队列    应用于进程间少量数据的顺序共享

信号量        应用于进程间互斥

共享内存    应用与进程间大量数据的随机共享访问

 

命令行查询IPC对象

ipcs [...]

-q       查询  消息队列

-s       查询  信号量

-m      查询  共享内存

-a       查询所有

无       查询所有

 

命令行删除 IPC对象

ipcrm [...]

-q   msqid      删除消息队列

-m  shmid      删除共享内存

-s   semid      删除信号量

 

 

创建消息队列的步骤:

消息队列使用(函数):

1: ftok        使用某个文件做关键字创建 key  

2: msgget  使用key创建消息队列 msqid

3: msgsnd  往消息队列中写入消息

4: msgrcv   从消息队列中读取消息

5: msgctl     删除消息队列

 

 

 

创建 IPC

<sys/types>

<sys/ipc.h>

key_t  ftok(const char *pathname, int id)

注意:由文件名 和 id 唯一决定一个key,文件名和 id都一样的话,获得的key必然一样,否则,key必然不一样,通过创建一个key,实现 两个进程通信的桥梁关键字。

该文件也要必须存在,可空白。

 

创建消息队列

<sys/types.h>

<sys/ipc.h>

<sys/msg.h>

int  msgget(key_t key, int msgflg)  

     返回 msqid

 

创建消息队列

IPC_PRIVATE     创建key = 0的消息队列,可以同时存在多个

IPC_EXCL | IPC_CREAT   确保这个消息队列是新创建的,不与已有的消息队列冲突

技术分享图片

技术分享图片

 

 

 

往消息队列发送消息

int msgsnd(int msqid,  const void *msg, size_t szLen, int msgflg);

参数解析:

msg 指向 msgbuf 结构(自定义)

 

struct msgbuf {

    long type;            //大于0的数

    char text[N];         //文本大小由 szLen 指定

}

 

msgflag:

     0                       队列满则阻塞

    IPC_NOWAIT    队列满则返回错误 (EAGAIN)

 

 

从消息队列中读取消息

size_t msgrcv(int msqid, void *msg, size_t sz, long msgtype,  int msgflg)

 

参数:

msg 指向 msg_info 数据结构, 同msgsnd

 

msgflg 取值 ( MSG_XXX 需要定义宏: _GNU_SOURCE)

    0                     :默认值,没有消息则阻塞

    IPC_NOWAIT: 没有这个类型的消息,立即返回 (ENOMSG)

    MSG_EXCEPT: 返回队列中第一个不是某类型的消息

注意:使用MSG_EXCEPT必须加上宏:#define _GNU_SOURCE,并且该宏必须在<sys/msg.h>前面定义。

MSG_NOERROR:   消息内容大于请求长度,丢弃多余部分

 

往消息队列发送消息

msgsnd 函数调用成功会修改 msqid_ds

msg_lspid           设置为对应进程PID

msg_qnum          加 1

msg_stime          设置为当前时间

 

从队列中读取消息

msgrcv 调用成功后,修改 msqid_ds 结构体

msg_lrpid:          设置为进程PID

msg_qnum:        减1

msg_rtime:         设置为当前是时间

 

例子:

typedef struct tagMsg

{

      long type;

      char szBuf[1024];

}MSG_S;

 

void testMsgRS()

{

      int ch;

      fprintf(stderr,r“select  r/w:”);

      scanf(“%c”,&ch);

      if(ch!=’w’&&ch!=’r’)

      {

           return ;

}

//创建key

      key_t key=ftok("123",321);

      if(key==-1)

      {

           perror("fail ftok");

           return ;

      }

      printf("key=%d ",key);

      //创建消息队列

      int msqid=msgget(key,IPC_CREAT|0666);

      if(msqid==-1)

      {

           perror("fail msgget!");

           return ;

      }

      printf("msqid=%d ",msqid);

     

      MSG_S msg;

      //read message

      if(rs==‘r‘)

      {

           while(1)

           {

                 memset(&msg,0,sizeof(msg.szBuf));

                 fprintf(stderr,"type:");

                 scanf("%ld",&(msg.type));

                 //read(0,&(msg.type),sizeof(long));

                 int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type,  IPC_NOWAIT);

                 if(nRet<0)

                 {

                      perror("fail msgsnd");

                      break;

                 }

                 printf("receive:%s ",msg.szBuf);

           }

      }

      //send message

      else if(rs==‘s‘)

      {

           fprintf(stderr,"example:1 ABCD! ");

           while(1)

           {

                 memset(&msg,0,sizeof(msg.szBuf));

                 fprintf(stderr,"send:");

                 scanf("%ld%s",&(msg.type),msg.szBuf);

                 //read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));

                 int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);

                 if(nRet<0)

                 {

                      perror("fail msgsnd");

                      break;

                 }

                

           }

      }

 

}

 

 

消息队列设置

int  msgctl(int msqid,   int cmd,  struct msqid_ds *buf)

 

参数:

cmd 参数

IPC_STAT      读取内核中 msqid_ds 数据到 buf

IPC_SET        设置 buf 中数据到 msqid_ds

IPC_RMID      移除消息队列,读写消息队列进程返回 EIDRM

IPC_INFO       返回系统级别的消息队列限制,保存到buf,这里 buf 指向 msginfo 结构数据

MSG_INFO     返回 msginfo 消息,同时获取资源消耗情况:

                        msgpool    系统中存在的消息队列数

                        msgmap    系统中所有消息队列的消息数

                        msgtql        系统中所有消息队列占用的字节数

MSG_STAT     返回 msginfo 消息

                        msqid 参数使用内核的消息队列信息

参数三结构体:

struct msqid_ds {

    struct ipc_perm msg_perm;      //消息队列权限等信息

    time_t                msg_stime;      //msgsnd 最后调用时间     

    time_t                msg_rtime;      //msgrcv 最后调用时间

    time_t                msg_ctime;      //消息队列最后改变的时间

    unsigned long   _msg_cbytes;  //消息队列当前消息总长度

    msgqnum_t        msg_qnum;     //消息队列中消息个数

    msglen_t            msg_qbytes;   //队列能存消息的最大长度

    pid_t                   msg_lspid;       //最后调用 msgsnd 的进程号

    pid_t                  msg_lrpid;        //最后调用 msgrcv 的进程

}

 

struct ipc_perm结构体

struct ipc_perm{

    key_t  __key;           //msgget 的key参数

    uid_t   uid                //消息队列所有者 euid

    gid_t   gid;               //消息队列所有者 egid

    uid_t   cuid;             //消息队列创建者 euid

    gid_t   cgid;             //消息队列创建者 egid

    unsigned short mode;  //访问权限

    unsigned short __seq;   //序列号

}

 

MSG_INFO     返回 msginfo 消息,结构体:

struct msginfo {

    int msgpool;       //系统中存在的消息队列数

    int msgmap;       //系统中所有消息队列的消息数

    int msgmax;       //单条消息中最大长度

    int msgmnb;       //消息队列最大可写入字节数

    int msgmni;        //消息队列可容纳消息条数

    int msgssz;        //消息段大小,未使用

    int msgtql;         //系统中所有消息队列占用的字节数

}

 

 

内核中维护的消息队列数据格式(链表)

struct msg {

    struct msg *msg_next;          //下一个消息节点

    long            msg_type;          //消息类型

    ushort         msg_ts;              //消息长度

    short           msg_spot;          //消息内容(指针)

}

 

 

例子:

#define _GNU_SOURCE 

#include <stdio.h>

#include <sys/msg.h>

#include <string.h>

#include <stdlib.h>

#include <sys/ipc.h>

#include <unistd.h>

 

typedef struct tagMsg

{

      long type;

      char szBuf[1024];

}MSG_S;

 

void testMsgRS(char rs)

{

      key_t key=ftok("123",321);

      if(key==-1)

      {

           perror("fail ftok");

           return ;

      }

      printf("key=%d ",key);

 

      int msqid=msgget(key,IPC_CREAT|0666);

      if(msqid==-1)

      {

           perror("fail msgget!");

           return ;

      }

      printf("msqid=%d ",msqid);

     

      MSG_S msg;

      //read message

      if(rs==‘r‘)

      {

           while(1)

           {

                 memset(&msg,0,sizeof(msg.szBuf));

                 fprintf(stderr,"type:");

                 scanf("%ld",&(msg.type));

                 //read(0,&(msg.type),sizeof(long));

                 int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type,  IPC_NOWAIT);

                 if(nRet<0)

                 {

                      perror("fail msgsnd");

                      break;

                 }

                 printf("receive:%s ",msg.szBuf);

           }

      }

      //send message

      else if(rs==‘s‘)

      {

           fprintf(stderr,"example:1 ABCD! ");

           while(1)

           {

                 memset(&msg,0,sizeof(msg.szBuf));

                 fprintf(stderr,"send:");

                 scanf("%ld%s",&(msg.type),msg.szBuf);

                 //read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));

                 int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);

                 if(nRet<0)

                 {

                      perror("fail msgsnd");

                      break;

                 }

           }

      }

}

//列举msqid_ds的信息

void printfInfo(struct msqid_ds *pstMsg)

{

      printf("--------msg_perm------ ");

      printf("msgget key:%#o ",pstMsg->msg_perm.__key);

      printf("all user euid:%d ",pstMsg->msg_perm.uid);

      printf("all user egid:%d ",pstMsg->msg_perm.gid);

      printf("creater euid:%d ",pstMsg->msg_perm.cuid);

      printf("creater egid:%d ",pstMsg->msg_perm.cgid);

      printf("R+W mode:%#o ",pstMsg->msg_perm.mode);  

      printf("queue numer:%d ",pstMsg->msg_perm.__seq);

      printf(" ");

      printf("msgsnd time:%d ",(int)pstMsg->msg_stime);

      printf("msgrcv time:%d ",(int)pstMsg->msg_rtime);

      printf("change time:%d ",(int)pstMsg->msg_ctime);

      printf("message len:%ld ",pstMsg->msg_cbytes);

      printf("message number:%d ",(int)pstMsg->msg_qnum);

      printf("queue max len:%d ",(int)pstMsg->msg_qbytes);

      printf("msgsnd pid:%d ",(int)pstMsg->msg_lspid);

      printf("msgrcv pid:%d ",(int)pstMsg->msg_lrpid);

      printf(" ");

}

 

 

//提示

void ListSel()

{

      printf(" stat:output message queue infomation ");

      printf(" set:set message queue mode ");

      printf(" exit:delete message queue ");

      printf(" ");

}

//消息队列设置

void testMsgctl()

{

      key_t key=ftok("123",321);

      if(key==-1)

      {

           perror("fail ftok");

           return ;

      }

      printf("key=%d ",key);

 

      int msqid=msgget(key,IPC_CREAT|0666);

      if(msqid==-1)

      {

           perror("fail msgget!");

           return ;

      }

      printf("msqid=%d ",msqid);

 

      ListSel();

      struct msqid_ds stMsg;

     

      char szCmd[128];

      //对消息队列进行操作。显示,修改

      while(1)

      {

           fprintf(stderr,"-->");

           scanf("%s",szCmd);

           //显示

           if(!strcmp(szCmd,"stat"))

           {

                 msgctl(msqid, IPC_STAT,  &stMsg);

                 printfInfo(&stMsg);

           }

           //修改权限

           else if(!strcmp(szCmd,"set"))

           {

                 msgctl(msqid, IPC_STAT,  &stMsg);

                 int mode;              

                 printf("now mode:%d ",stMsg.msg_perm.mode);

                 fprintf(stderr,"input new mode:");

                 scanf("%o",&mode);

                

if(mode<0||mode>0777)

                 {

                      fprintf(stderr,"mode is invaid ");

                      continue;

                 }

                 //修改操作

                 stMsg.msg_perm.mode=mode;

                 //修改后更新到消息队列

int nRet=msgctl(msqid, IPC_SET, &stMsg);

                 if(nRet)

                 {

                      perror("failed IPC_SET");

                      continue;

                 }

                 else

                 {

                      printf("set mode sucess ");

                 }

           }

           //退出

           else if(!strcmp(szCmd,"exit"))

           {

                 break;

           }

      }

     

      //删除消息队列

      fprintf(stderr,"sure to delete this message[y/n]:");

      scanf("%s",szCmd);

     

      if(!strcmp(szCmd,"y"))

      {

           //删除操作

           int nRet=msgctl(msqid, IPC_RMID, &stMsg);

           if(nRet)

           {

                 perror("failed IPC_RMID");

           }

           else

           {

                 printf("delete message queue[%d]sucess! ",msqid);

           }

      }

}

     

 

int main(int argc,char** argv)

{

      /*//默认参数传入

      if((argc!=2)||(strcmp(argv[1],"s")&&strcmp(argv[1],"r")))

      {

           printf("select [s/r/c]:%s ",argv[1]);

           printf(" s:send/receive message ");

           printf(" r:receive message ");

           printf(" c:control message queue ");    

           return 0;    

      }

     

      if(argv[1][0]==‘s‘||argv[1][0]==‘r‘)

      {

           testMsgRS(argv[1][0]);

      }

      else

      {

           testMsgctl();

      }*/

     

      printf(" s:send/receive message ");

      printf(" r:receive message ");

      printf(" c:control message queue ");    

      printf("select your choice [s/r/c]:");

      char ch;

      scanf("%c",&ch);

      //读、写队列消息

      if(ch==‘s‘||ch==‘r‘)

      {

           testMsgRS(ch);

      }

      //修改信息队列权限

      else

      {

           testMsgctl();

      }

      return 0;

}

 

以上是关于21消息队列的主要内容,如果未能解决你的问题,请参考以下文章

检查一条消息是不是在 POSIX 消息队列中而没有从队列中删除它?

为啥从队列中并行读取消息很慢?

(21)Blender源码分析之鼠标按下消息添加到队列的过程

(21)Blender源码分析之鼠标按下消息添加到队列的过程

消息队列 ENOMEM

21 利用分布式消息队列降低系统耦合性