IPC - 消息队列(Message Queue)- 使用
Posted Redamanc
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IPC - 消息队列(Message Queue)- 使用相关的知识,希望对你有一定的参考价值。
Message Queue
XSI IPC
XSI(Unix System Interface and Headers):
代表了一种Unix
系统的标准。
XSI IPC,依托标识符
和键
来实现的,如同管道靠文件描述符来实现一样。
有三种IPC我们称作XSI IPC,即消息队列、信号量、共享存储器(共享内存)。
XSI IPC源自
系统V
(System V:Unix操作系统的一个版本)的IPC功能,二者密切相关。
IPC标识
内核为每个进程间通信维护一个结构体形式的IPC对象。该对象可以通过一个非负整数的IPC标识来引用。
注意:
与文件描述符不同,文件描述符
总是找当前系统中可用
的最小
的数,而IPC标识是持续加一的。
IPC键值
IPC标识是IPC对象在内核空间中的唯一性标识ID;
IPC键值
是IPC对象在用户空间
中的唯一性标识ID。
1)无论何时,只要创建IPC对象,就必须指定一个键值;
2)键值的数据类型在sys/types.h
头文件中被定义为key_t
,其原始类型就是长整型long
。
IPC对象的访问
- IPC对象是全局对象
-
- 可用
ipcs
、ipcrm
等命令查看
和删除
- 可用
- 每个IPC对象都由
get
函数创建 -
msgget
、shmget
、semget
-
- 调用get函数的时候必须指定关键字key
IPC对象的权限和所有者结构体
XSI IPC为每一个IPC结构设置了一个ipc_perm
结构。
该结构规定了权限和所有者,至少包括下列成员:
#include <sys/ipc.h>
struct ipc_perm
{
uid_t uid; /*owner`s effective user id*/
gid_t gid; /*owner`s effective group id*/
uid_t cuid; /*creator`s effective user id*/
gid_t cgid; /*creator`s effective group id*/
mode_t mode; /*access modes*/
...
};
每种实现在其ipc_perm
结构中会包括另外一些成员。
如果想要查看完整定义,参见<sys/ipc.h>
消息队列
消息队列是一种标准的IPC。
百度定义:“消息队列”是在消息的传输过程中保存消息的容器。
对于IPC来说,消息队列就是内核中一个消息的链表。
可以把消息看作是一个记录,具有特定的格式
以及特定的优先级
,对消息队列有写权限的进程可以向消息队列中按照一定的规则添加消息;对消息队列有读权限的进程则可以从消息队列中读走消息。
目前主要有两种类型的消息队列:POSIX消息队列以及系统V(System V)消息队列。系统V消息队列目前仍大量被使用,考虑到程序的可移植性,新开发的应用程序应尽量使用POSIX消息队列
。
消息
在进程间通信中,用户进程将数据传输到内核后,内核重新添加一些比如用户id
、组id
、读写进程的id
和优先级
等相关信息后打包成一个数据称为消息。
特性
- 允许一个躲着多个进程往消息队列中写消息和读消息,但一个消息只能被一个进程读取,读取完毕后就会自动删除;
- 消息队列具有一定的FIFO(
First In First Out
:先入先出)的特性,消息可以按照顺序发送到队列中,也可以按不同的方式从队列中读取。每一个消息队列在内核中用一个唯一的IPC标识ID表示。
消息队列属性结构体
每一个消息队列都有一个msqid_ds
结构体:
#include <sys/msg.h>
struct msqid_ds
{
struct ipc_perm msg_perm;
msgqnum_t msg_qnum; /* # of message on queue*/
msglen_t msg_qbytes; /* max # of bytes on queue*/
pid_t msg_lspid; /* pid of last msgsnd*/
pid_t msg_lrpid; /* pid of last msgrcv*/
time_t msg_stime; /* last-msgsnd() time*/
time_t msg_rtime; /* last-msgrcv() time*/
time_t msg_ctime; /* last-change time*/
...
};
打开或创建消息队列
#include <sys/msg.h>
int msgget(key_t key, int flag);
return:成功返回内核中消息队列的标识ID,出错返回-1
参数:
key
:用户指定的消息队列键值;
flag
:消息队列的权限:IPC_CREAT
、IPC_EXCL
等的组合。
若创建消息队列,key
可以指定键值,也可以将之设置为IPC_PRIVATE
(值为0)。
若对消息队列进行查询,则key
不能为0,否则查不到。
控制消息队列
#include <sys/msg.h>
int msgctl(int msgid, int cmd, struct msqid_ds *buf);
return:成功返回0,出错返回-1
参数:
msgid
:消息队列ID;
cmd
:消息队列控制命令:
- IPC_STAT:获取消息队列的属性,取此队列的
msqid_ds
结构,并将其存放在buf
指向的结构中; - IPC_SET:设置属性,按由
buf
指向的结构中的值,设置与此队列相关的结构中的字段; - IPC_RMID:删除队列,从系统中删除消息队列上的所有数据。
buf
:消息队列属性指针。
发送消息队列
#include <sys/msg.h>
int msgsnd(int msgqid, const void *ptr, size_t nbytes, int flag);
return:成功返回0,出错返回-1
参数:
msgqid
:消息队列ID;
ptr
:通用指针,指向了想要发送的消息类型指针,其中的类型需要用户自己定义:
struct mymessage
{
long mtype; /*positive message type*/
char mtext[512]; /*message data,of length nbytes*/
};
nbytes
:指定消息的大小,不包括mtype的大小,即只包含数据大小而不包含数据类型大小;
flag
:标志参数,可以指定为阻塞和非阻塞。
0
:阻塞;IPC_NOWAIT
:非阻塞标志。
当消息队列已满,或者是队列中的消息总数等于系统限制值,或队列中的字节总数等于系统限制值时:
若指定IPC_NOWAIT的状态下:
msgsnd()
函数会立即出错返回EAGAIN
。若指定0的状态下:
- 进程阻塞直到有空间可以容纳要发送的消息;
- 或从系统中删除了此队列;
- 或捕捉到一个信号,并从信号处理程序返回。
接收消息队列
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag);
return:成功返回消息的数据部分长度,出错返回-1
参数:
msqid
:消息队列ID;
ptr
:指向存放消息的缓存;
nbytes
:消息缓存的大小,不包含mtype的大小。
- 计算方式:
nbytes
=sizeof (struct mymessage)
-
sizeof(long)
type
:消息类型;
- type
==
0:获得消息队列中的第一个消息; - type
>
0:获得消息队列中类型为type
的第一个消息; - type
<
0:获得消息队列中小于或等于type
绝对值(即最小的)消息;
flag
:0 或者 IPC_NOWAIT。
发送消息队列实例
代码
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/msg.h>
typedef struct
{
long type; //消息类型
int start; //消息数据本身(包括了start和end)
int end;
} MSG;
/*
* 往消息队列中发送消息
*/
int main(int argc, char *argv[])
{
if (argc < 2)
{
printf("Lack arguments %s\\n", argv[0]);
exit(1);
}
key_t key = atoi(argv[1]);
printf("key: %d\\n", key);
// 创建消息队列
int msq_id;
if ((msq_id = msgget(key, IPC_CREAT | IPC_EXCL | 0777)) < 0)
{
perror("msgget error");
}
printf("msq id: %d\\n", msq_id);
// 定义要发送的消息
MSG m[5] = {{4, 4, 400}, {2, 2, 200}, {1, 1, 100}, {7, 4, 40}, {7, 40, 400}};
// 发送消息到消息队列
for (int i = 0; i < 5; ++i)
{
if (msgsnd(msq_id, &m[i], sizeof(MSG) - sizeof(long), IPC_NOWAIT) < 0)
{
perror("msgsnd error");
}
}
// 发送后去获得消息队列中消息的总数
struct msqid_ds ds;
if (msgctl(msq_id, IPC_STAT, &ds) < 0)
{
perror("msgctl error");
}
/* msg_qnum:number of messages currently on queue */
printf("total message: %ld\\n", ds.msg_qnum);
exit(0);
}
运行
首先我们直接不加任何参数的执行程序:
可以看到提示
了:缺少参数。
接着我们加上所需的参数-消息队列键值:
可以看到正常运行,并且键值为自己指定的7
;消息队列标识是1
。
如果我们再次运行:
会发现,出错了,提示说File exists
文件已存在,所以,在用户空间中的唯一标识不能重复。
通过命令ipcs -q
查看消息队列对象:
可以看到msqid
的值为之前的1
。
接收消息队列实例
代码
#include <sys/msg.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct
{
long type; //消息类型
int start; //消息数据本身(包括了start和end)
int end;
} MSG;
int main(int argc, char *argv[])
{
if (argc < 3)
{
printf("Lack arguments: %s\\n", argv[0]);
exit(1);
}
key_t key = atoi(argv[1]);
long type = atoi(argv[2]);
// 获得指定的消息队列
int msq_id;
if ((msq_id = msgget(key, 0777)) < 0)
{
perror("msgget error");
exit(1);
}
printf("msg id: %d\\n", msq_id);
MSG m;
if ((msgrcv(msq_id, &m, sizeof(MSG) - sizeof(long), type, IPC_NOWAIT)) < 0)
{
perror("msgrcv error");
exit(1);
}
else
{
printf("type: %ld start: %d end: %d\\n", m.type, m.start, m.end);
}
exit(0);
}
运行
在运行前,我们得首先确保内核中有这个消息队列:
确保有消息队列之后,就可以读取消息了:
同样的,需要传入参数:
运行的时候需要传入两个参数:key
和type
;
第一个ket
就是我们自己定义的消息队列键值
,
第二个type
就是我们想要读取的消息类型
;
当读取一个消息后,我们会发现,消息队列中的消息数从5
变为了4
;
也就是说,消息读取完毕后自动删除;
同样的,我们可以验证type
为0
的情况:
读取到的是4
号类型消息。
我们发送的时候,第一个消息就是4
号类型:
还有type
小于0
的情况:
在第一个消息{4, 4, 400}
已经读取过自动删除的情况下,目前小于-5
的绝对值也就是5
的最小值就是2
:
当我们读取完所有的消息后:
我们会发现,即使所有的消息都被删除了,但是消息队列对象还存在!
这也是XSI IPC和管道的一个区别:
IPC对象
的生命周期
是由用户管理的;
管道
是由内核管理释放的。
必须得用户自己删除IPC对象:
参考资料
【1】XSI IPC机制. meetings. CSDN. 2015-07-29
【2】LuckY_chh. bilibili. Linux系统程序设计–IPC. 2018-11-27
以上是关于IPC - 消息队列(Message Queue)- 使用的主要内容,如果未能解决你的问题,请参考以下文章