消息队列的简单实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列的简单实现相关的知识,希望对你有一定的参考价值。

消息队列:一个进程向另一个进程发送数据块
消息队列基于消息,管道基于字节流

消息队列是用链表实现

1.创建:int megget(key_t key, int msgflag)
    key:函数ftok()的返回值
    msgflag:IPC_CREAT是创建新的消息队列;IPC_EXCL与IPC_CREAT一起使用,即如果要创建的消息队列已存在,则返回错误
    成功:返回队列标识符
    失败:-1
2.获取:ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
    msqid:消息队列的识别码
    msgp:指向消息缓冲区的指针,用来暂时存储发送和接收的消息,是用户自己定义的一个通用结构
        struct msgbuf
        {
            long mtype;  //消息类型,必须> 0
            char mtext[__SIZE__];  //消息文本
        }
    msgsz:消息大小
    msgtyp:消息类型(>0,返回其类型为mtype的第一个消息)
    msgflg:控制函数行为(0表示忽略)

    成功:返回拷贝到mtext数组的实际字节数
    失败:-1
3.发送:int msgsnd(int msqid, void *msgp, size_t msgsz, int msgflg)
    成功:返回0
    失败:-1
4.设置属性:int msgctl(int msqid, int cmd, struct msqid_ds *buf)
    对msqid标识的消息队列执行cmd操作
    cmd操作:IPC_STAT(获取对应的msqid_ds的数据结构,保存在buf指定的地址空间)
         IPC_SET(设置消息队列的属性,保存在buf中)
         IPC_RMID(从内核删除msqid标识的消息队列)
    msqid_ds:描述队列当前状态
    成功:0
    失败:-1


comm.h
#ifndef __COMM__
#define __COMM__

#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <string.h>

#define __SIZE__ 1024
#define FILEPATH "/tmp/.msg1"
#define ID 0


const int ser_send_type=1;
const int cli_send_type=2;

typedef struct msg_info
{
    char mtext[__SIZE__];
    long mtype;
}msginfo;


#endif

client.h
#ifndef __CLIENT__
#define __CLIENT__

#include "comm.h"

int cli_start();
int cli_end(int);

#endif
client.c
#include "client.h"

int msg_id= -1;

int cli_start()
{

    //create the client‘s buf which storage the msg
    msginfo cli_info;

    //create the key
    key_t key= ftok(FILEPATH, ID);
    if(key< 0)
    {
        perror("ftok");
        exit(1);
    }
    //get the ID of msg_queue
    msg_id= msgget(key, 0);
    if(msg_id< 0)
    {
        perror("cli get key id failed");
        exit(2);
    }

    while(1)
    {
        //client said
        printf("client:>");
        fgets(cli_info.mtext, __SIZE__, stdin);  //write in the cli_info
        //when it will exit?
        if(strncasecmp(cli_info.mtext, "quit", 4)==0)
        {
            printf("client bye!\n");
            break;
        }
        cli_info.mtype= cli_send_type;
        //send the msg(send judge)
        if(msgsnd(msg_id, &cli_info, sizeof(cli_info.mtext),0)== -1) //send fail
        {
            perror("client send msg fail");
            exit(3);
        }

        //receive the msg
        memset(cli_info.mtext, ‘\0‘, sizeof(cli_info.mtext));  //reset first
        if(msgrcv(msg_id, &cli_info, sizeof(cli_info.mtext), ser_send_type, 0)==-1)  //receive fail
        {
            perror("client msgrcv fail");
            exit(4);
        }
        printf("server:>%s\n", cli_info.mtext);  //the msg receive from server have storaged in the cli_info.mtext
        fflush(stdout);
    }
    return 0;
}
int cli_end(int id)
{
    if(msgctl(id, IPC_RMID, NULL)==-1)  //fail
    {
        perror("delete th msg_queue from kernel fail");
        exit(5);
    }
    else
    {
        printf("delete the msg_queue from kernel success\n");
        return 0;
    }
}
static void delete_msg(void)
{
    if(msg_id!= -1)  //the msg_queue exist
    {
        cli_end(msg_id);
    }
    printf("delete the msg queue end\n");
}
int main(int argc,char *argv[])
{
    atexit(delete_msg);
    if(cli_start()==0)
    {
        printf("cli start success\n");
    }
    else
    {
        printf("cli start failed\n");
    }
    return 0;
}

server.h
#ifndef __SERVER__
#define __SERVER__

#include "comm.h"

int ser_start();
int ser_end(int);

#endif
server.c
#include "server.h"

int msg_id= -1;
int ser_start()
{

    //create the server‘s buf which storage the msg
    msginfo ser_info;

    //create the key
    key_t key= ftok(FILEPATH, ID);
    if(key< 0)
    {
        perror("ftok");
        exit(1);
    }
    //get the ID of msg_queue
    msg_id= msgget(key, IPC_CREAT|IPC_CREAT|0666);
    if(msg_id< 0)
    {
        perror("ser get key id failed");
        exit(2);
    }

    while(1)
    {
        //reveive the msg
        if(msgrcv(msg_id, &ser_info, sizeof(ser_info.mtext), cli_send_type, 0)==-1) //fail
        {
            perror("server msgrcv fail");
            exit(3);
        }
        printf("client:>%s\n",ser_info.mtext);  //first,output the client‘s msg

        printf("server:>");  //then,server will said
        memset(ser_info.mtext, ‘\0‘, sizeof(ser_info.mtext));  //reset first
        fgets(ser_info.mtext, __SIZE__, stdin);  //write in the ser_info
        //when it will exit?
        if(strncasecmp(ser_info.mtext, "quit", 4)==0)
        {
            printf("server bye!\n");
            break;
        }
        ser_info.mtype= ser_send_type;
        //send the msg
        if(msgsnd(msg_id, &ser_info, __SIZE__, 0)==-1)  //fail
        {
            perror("server msgsnd fail");
            exit(4);
        }
        fflush(stdout);
    }
    return 0;
}
int ser_end(int id)
{
    if(msgctl(id, IPC_RMID, NULL)== -1)  //fail
    {
        perror("delete the msg_queue from kernel fail");
        exit(5);
    }
    else
    {
        printf("delete the msg_queue from kernel success\n ");
        return 0;
    }
}
static void delete_msg(void)
{
    if(msg_id!= -1)  //the msg_queue exist
    {
        ser_end(msg_id);
    }
    printf("delete the msg_queue end\n");
}
int main(int argc, char *argv[])
{
    atexit(delete_msg);
    if(ser_start()==0)  //success
    {
        printf("ser start success\n");
    }
    else
    {
        printf("ser start failed\n");
    }
    return 0;
}

Makefile
.PHONY:all
all:client server
client:client.c
    gcc -o [email protected] $^
server:server.c
    gcc -o [email protected] $^
.PHONY:clean
clean:
    rm -f client server


以上是关于消息队列的简单实现的主要内容,如果未能解决你的问题,请参考以下文章

vue-简单的消息队列

到底啥是消息队列?Java中如何实现消息队列

消息队列

消息队列 + 线程实现简单异步

Redis的基本使用(二) 消息队列

使用java实现阿里云消息队列简单封装