POSIX消息队列

Posted tianzeng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了POSIX消息队列相关的知识,希望对你有一定的参考价值。

消息队列是Linux IPC中很常用的一种通信方式,它通常用来在不同进程间发送特定格式的消息数据。

消息队列和管道和FIFO区别

     1.一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达

  管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必需已经打开来读,那么内核会产生SIGPIPE信号(感谢shanshan_fangfang的指正)。

    2.一个进程在往某消息队列写入消息后, 终止进程. 另一个进程某时刻读出该消息;消息队列是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没有重新自举,消息队列没     有被删除。然而对于管带或FIFO而言, 当管道或FIFO的最后一次关闭发生时,仍在管道或FIFO中的数据将被抛弃(管道和FIFO是随进程的持续性)

Posix消息队列和System V 消息队列区别
    对Posix消息队列的读取总是返回最高优先级的最早消息; 对System V消息队列的读则可以返回任意指定优先级的消息
    当往一个空队列放置一个消息时, Posix消息队列允许产生一个信号或者启动一个线程; System V则不提供类似机制

消息队列中的每条消息通常具有以下属性:
    一个表示优先级的整数;
    消息的数据部分的长度;
     消息数据本身;

技术分享图片

1.POSIX消息队列的创建和关闭

#include <mqueue.h> 
mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */); //成功返回消息队列描述符,失败返回-1 


mqd_t mq_close(mqd_t mqdes); 

mqd_t mq_unlink(const char *name); //成功返回0,失败返回-1
#include <bits/mqueue.h>
typedef int mqd_t;

mq_open用于打开或创建一个消息队列。

name:表示消息队列的名字,它符合POSIX IPC的名字规则。POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’;所创建的POSIX消息队列不会在文件系统中创建真正的路径名

oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。

mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。

attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。


mq_close用于关闭一个消息队列,和文件的close类型,关闭后,消息队列并不从系统中删除。一个进程结束,会自动调用关闭打开着的消息队列。

mq_unlink用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,当消息队列的引用计数大于0时,将其name删除,队列的析构到对后一个mq_close调用时才发生

2.posix消息队列的属性

#include <mqueue.h> 
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr); 
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); //成功返回0,失败返回-1

#include <bits/mqueue.h> 
struct mq_attr
 {
     long int mq_flags; /* Message queue flags. */消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞 0 O_NONBLOCK
     long int mq_maxmsg; /* Maximum number of messages. */消息队列的最大消息数
     long int mq_msgsize; /* Maximum message size. */消息队列中每个消息的最大字节数
     long int mq_curmsgs; /* Number of messages currently queued. */ 消息队列中当前的消息数目
   long int __pad[4]; };

mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。

mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。

mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。 

3.posix消息队列的使用

#include <mqueue.h>
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); //成功返回0,出错返回-1 
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); //成功返回接收到消息的字节数,出错返回-1


#ifdef __USE_XOPEN2K mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout); mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout); #endif

mq_send向消息队列中写入一条消息,mq_receive从消息队列中读取一条消息。

mqdes:消息队列描述符;

msg_ptr:指向消息体缓冲区的指针;

msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。

msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。

还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。

//mqsend,c
#include "unpipc.h"
#include <mqueue.h>

int main()
{
    printf("请输入消息队列的名字:");
    char name[MAXLINE];
    fgets(name,MAXLINE,stdin);
    int len=strlen(name);
    if(name[len-1]==
)
        --len;

    int flags=O_RDWR|O_CREAT|O_EXCL;
    mqd_t mqd=mq_open(name,flags,FILE_MODE,NULL);
    if(mqd<0)
        if(errno==EEXIST)
        {
            mq_unlink(name);
            mqd=mq_open(name,O_RDWR|O_CREAT,FILE_MODE,NULL);
        }
        else
        {
            printf("open message queue error...
");
            return -1;
        }

    struct mq_attr attr;
    mq_getattr(mqd,&attr);
    char msg[MAXLINE];

    int i;
    for(i=0;i<6;++i)
    {
        printf("请输入第 %d 个消息:",i+1);
        scanf("%s",msg);
        printf("请输入第 %d 个消息的优先级:",i+1);
        int prio;
        scanf("%d",&prio);

        if(mq_send(mqd,msg,sizeof(msg),prio)<0)
            printf("send message: %d faild.error info: %s
",i,strerror(errno));
        
        printf("send message: %d success.
",i+1);

    }
    exit(0);
}
//mqreceive.c
#include "unpipc.h"
#include <mqueue.h>

int main()
{
    char name[MAXLINE];
    printf("请输入接受消息的消息队列的名字:");
    fgets(name,MAXLINE,stdin);
    int len=strlen(name);
    if(name[len-1]==
)
        --len;

    mqd_t mqd=mq_open(name,O_RDONLY);

    struct mq_attr attr;
    mq_getattr(mqd,&attr);
    char *buffer=(char *)malloc(sizeof(char)*attr.mq_msgsize);

    int i;
    for(int i=0;i<6;++i)
    {
        int prio;
        if(mq_receive(mqd,buffer,attr.mq_msgsize,&prio)<0)
        {
            printf("receive message failed. error info: %s
",strerror(errno));
            continue;
        }
        printf("receive message %d: %s, prio is: %d
",i+1,buffer,prio);
    }
    mq_close(mqd);
    exit(0);
}

 

4.POSIX消息队列的限制

POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分别用于限定消息队列中的最大消息数和每个消息的最大字节数。在前面已经说过了,这两个参数可以在调用mq_open创建一个消息队列的时候设定。当这个设定是受到系统内核限制的。

下面是在Linux 2.6.18下shell对启动进程的POSIX消息队列大小的限制:

# ulimit -a |grep message
POSIX message queues     (bytes, -q) 819200

限制大小为800KB,该大小是整个消息队列的大小,不仅仅是最大消息数*消息的最大大小;还包括消息队列的额外开销。前面我们知道Linux 2.6.18下POSIX消息队列默认的最大消息数和消息的最大大小分别为:

mq_maxmsg = 10
mq_msgsize = 8192
MQ_OPEN_MAX 一个进程同时能够拥有的打开的消息队列的最大数目 posix要求至少位为8

MQ_PRIO_MAX 一个进程的最大优先值加1 posix要求他至少为32

在<unistd.h>中定义

5.mq_notify函数

Posix消息队列允许异步事件通知, 以告知何时有一个消息放置到了某个空消息队列中, 以下两种方式可选:
    1 产生一个信号
    2 创建一个线程来执行一个指定函数

mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);

struct sigevent
{
    int sigev_notify; //notification type SIGEV_{NONE,SIGNAL,THREAD}
    int sigev_signo; //signal number if SIGEV_SIGNAL
    union sigval   sigev_value; //signal value
    void (*sigev_notify_function)(union sigval);//passed to signal handeler or thread following two if SIGEV_THREAD
    pthread_attr_t *sigev_notify_attributes;
}

union sigval
{
    int sival_int; //integer value
    void *sival_ptr; //pointer value
}

sigev_notify选项意义

SIGEV_SIGNAL:发送由evp->sigev_sino指定的信号到调用进程,evp->sigev_value的值将被作为siginfo_t结构体中si_value的值。

SIGEV_NONE:什么都不做,只提供通过timer_gettime和timer_getoverrun查询超时信息。

SIGEV_THREAD:以evp->sigev_notification_attributes为线程属性创建一个线程,在新建的线程内部以evp->sigev_value为参数调用evp->sigev_notification_function。
把sigev_notify设置成SIGEV_THREAD,这会创建一个新的线程,该线程调用由sigev_notify_function指定的函数,调用的参数由sigev_value指定,新线程属性由sigev_notifyattributes指定,要是默认属性合适的话,他可以是一个空指针。

SIGEV_THREAD_ID:和SIGEV_SIGNAL类似,不过它只将信号发送到线程号为evp->sigev_notify_thread_id的线程,注意:这里的线程号不一定是POSIX线程号,而是线程调用gettid返回的实际线程号,并且这个线程必须实际存在且属于当前的调用进程。

//文件:/usr/include/asm-generic/siginfo.h 
typedef struct sigevent 
{
    sigval_t sigev_value;//notification type
    int sigev_signo;//signal number
    int sigev_notify;
  
    union 
    {
        int _pad[SIGEV_PAD_SIZE];
        int _tid;

        struct 
        {
            void (*_function)(sigval_t);
            void *_attribute;    /* really pthread_attr_t */
        } _sigev_thread;
    } _sigev_un;
} sigevent_t;

如果mq_notify函数的notification非空, 那么当前进程希望在有一个消息到达所指定的先前为空的队列时得到通知. 即"该进程被注册为接收该队列的通知"
如果notification为空, 而且当前进程目前被注册为接收所指定队列的通知, 那么已存在的注册将被撤销
任意时刻只能有一个进程可以被注册为接收某个给定队列的通知
当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知, 只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下, 通知才会发出. 即在mq_receive中的阻塞比任何通知的注册都要优先
当通知被发送给注册进程时, 其注册就被撤销, 如果想的话, 需要重新注册(所以一般情况下, 都是在信号处理函数中的一开始就再次调用mq_notify进行重新注册)





























































以上是关于POSIX消息队列的主要内容,如果未能解决你的问题,请参考以下文章

Posix消息队列

linux进程间通信之Posix消息队列

POSIX消息队列

Posix消息队列

检查一条消息是不是在 POSIX 消息队列中而没有从队列中删除它?

尝试使用 POSIX 消息队列创建消息队列时权限被拒绝