消息队列

Posted fellow_jing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列相关的知识,希望对你有一定的参考价值。

消息队列是比较常用的进程间通信方式。在公司的代码中应用也比较广泛。

消息队列一般用在需要异步执行,并且消息内容比较短的情况。一般用来发控制消息,和反馈的消息。

比如说,一个进程需要另外一个进程做某些操作,并且希望得到操作完成的反馈信息。

首先将消息队列的函数进行封装,并编译成so.

基本的结构体和封装函数如msg.h

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <errno.h>
typedef enum{
  OPEN,
  CLOSE,
  PLAY,
  PAUSE,
  PREV,
  NEXT,
}CtrlCmd;//控制命令
typedef enum{
  OPEN_DONE,
  CLOSE_DONE,
  PLAY_DONE,
  PAUSE_DONE,
  PREV_DONE,
  NEXT_DONE,
}CtrlFeedback;//反馈结果
typedef struct{
  union {
  CtrlCmd _ctlCmd;
  CtrlFeedback _ctlFeedback;
  }u;
  int param;
}CtrlInfo;
typedef enum {
  CTRL_CMD = 1,//控制消息
  CTRL_FEEDBACK = 2,//反馈信息
}MsgType;
typedef struct{
  long int _msgType;
  CtrlInfo _ctlInfo;
}FellowMsg;
int fellow_create_msg_queue();

int fellow_remove_msg_queue(int msg_q_id);

int fellow_send_msg(int msg_q_id, FellowMsg *pFellowMsg);

int fellow_rcv_msg(int msg_q_id, FellowMsg *pFellowMsg);

相关封装函数在msg.c

#include "msg.h"
int fellow_create_msg_queue(const char *path, int proj_id)
{
  key_t key;
  int msg_q_id = -1;
  if (-1 == (key = ftok(path, proj_id)))
  {
    printf("ftok fail, errno:%d\\n", errno);
    return -1;
  }
  printf("key:0x%x\\n", (int)key);
  if (-1 == (msg_q_id = msgget(key, IPC_CREAT | 0666)))
  {
    printf("msgget fail, errno:%d", errno);
    return -1;
  }
  return msg_q_id;

}

int fellow_remove_msg_queue(int msg_q_id)
{
  if (-1 == msgctl(msg_q_id, IPC_RMID, NULL))
  {
    printf("msgctl fail, errno:%d", errno);
    return -1;
  }
  return 0;
}

int fellow_send_msg(int msg_q_id, FellowMsg *pFellowMsg)
{
  if (NULL == pFellowMsg)
  {
    return -1;
  }
  size_t msgLen = sizeof(FellowMsg) - sizeof(long int);
  if (-1 == msgsnd(msg_q_id, (void *)pFellowMsg, msgLen, 0))
  {
    printf("msgsnd fail, errno:%d", errno);
    return -1;
  }
  return 0;
}

int fellow_rcv_msg(int msg_q_id, FellowMsg *pFellowMsg)
{
  if (NULL == pFellowMsg)
  {
  return -1;
  }
  size_t msgLen = sizeof(FellowMsg) - sizeof(long int);
  if (-1 == msgrcv(msg_q_id, (void *)pFellowMsg, msgLen, 0, 0))
  {
    printf("msgrcv fail, errno:%d", errno);
    return -1;
  }
  return 0;
}

将msg.c编译成libmsg.so,gcc --share -I. msg.c -o libmsg.so

发送控制消息,并监听反馈消息的进程msgsnd.c. (gcc msgsnd.c -o msgsnd -L. -lmsg -lpthread)

#include <sys/prctl.h>
#include <string.h>
#include <stdio.h>
#include "msg.h"
#define MSG_CREAT_PATH "/mnt/hgfs/share/test/list"
#define MSG_RCV_ID 4//和msgrcv.c正好相反
#define MSG_SND_ID 3

void *fellow_listenning_msg(void *arg)//监听反馈信息
{
  if(0 != prctl(PR_SET_NAME, (unsigned long)"fellow_process_msg"))
  {
    printf("prctl fail, errno:%d", errno);
  }
  int msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_RCV_ID);
  FellowMsg _fellowMsg;
  while (1)
  {
    memset(&_fellowMsg, 0, sizeof(FellowMsg));
    fellow_rcv_msg(msg_q_id, &_fellowMsg);
    if (CTRL_FEEDBACK ==_fellowMsg._msgType)
    {
      switch (_fellowMsg._ctlInfo.u._ctlFeedback)
      {
        case OPEN_DONE:
          printf("rcv OPEN_DONE:%d\\n", _fellowMsg._ctlInfo.param);
        break;
        case CLOSE:
          printf("rcv CLOSE_DONE:%d\\n", _fellowMsg._ctlInfo.param);
        break;
        case PLAY:
          printf("rcv PLAY_DONE:%d\\n", _fellowMsg._ctlInfo.param);
        break;
        default:
        break;
      }
    }
  }
}
void main(void)
{
  pthread_t thread_id;
  int snd_msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_SND_ID);
  printf("msgid:%d\\n",snd_msg_q_id);
  FellowMsg _fellowMsg;
  _fellowMsg._msgType = CTRL_CMD;
  _fellowMsg._ctlInfo.u._ctlCmd = OPEN;
  _fellowMsg._ctlInfo.param = 1;
  fellow_send_msg(snd_msg_q_id, &_fellowMsg);//发送OPEN命令
  pthread_create(&thread_id, NULL, fellow_listenning_msg, NULL);
  while (1)
  {
  }
}

接收控制消息,做完相关处理后,返回反馈消息。msgrcv.c(gcc msgsnd.c -o msgsnd -L. -lmsg -lpthread)

#include <sys/prctl.h>
#include <string.h>
#include <stdio.h>
#include "msg.h"
#define MSG_CREAT_PATH "/mnt/hgfs/share/test/list"
#define MSG_RCV_ID 3
#define MSG_SND_ID 4
void *fellow_process_msg(void *arg)
{
  printf("thread start\\n");
  int msg_q_id = *(int *)arg;
  int snd_msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_SND_ID);
  if(0 != prctl(PR_SET_NAME, (unsigned long)"fellow_process_msg"))
  {
    printf("prctl fail, errno:%d", errno);
  }
  FellowMsg _fellowRcvMsg;
  FellowMsg _fellowSndMsg;
  while (1)
  {
    memset(&_fellowRcvMsg, 0, sizeof(FellowMsg));
    if( -1 == fellow_rcv_msg(msg_q_id, &_fellowRcvMsg))
      break;
    if (CTRL_CMD ==_fellowRcvMsg._msgType)
    {
      switch (_fellowRcvMsg._ctlInfo.u._ctlCmd)
      {
        case OPEN:
        printf("rcv OPEN:%d\\n", _fellowRcvMsg._ctlInfo.param);

                    /*do something when rcv open*/
        _fellowSndMsg._msgType = CTRL_FEEDBACK;
        _fellowSndMsg._ctlInfo.u._ctlFeedback = OPEN_DONE;
        _fellowSndMsg._ctlInfo.param = 0;
        fellow_send_msg(snd_msg_q_id, &_fellowSndMsg);
        break;
        case CLOSE:
          printf("rcv CLOSE:%d\\n", _fellowRcvMsg._ctlInfo.param);
        break;
        case PLAY:
          printf("rcv PLAY:%d\\n", _fellowRcvMsg._ctlInfo.param);
        break;
        default:
        break;

      }
    }

  }
}
void main(void)
{
  pthread_t thread_id;
  int msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_RCV_ID);
  printf("msgid:%d\\n", msg_q_id);
  pthread_create(&thread_id, NULL, fellow_process_msg, (void *)&msg_q_id);
  while (1)
  {
    /*do some thing in main thread*/
  }
}

首先运行msgrcv进程,监听控制消息,然后运行msgsnd进程,发送控制消息,然后接收msgrcv的反馈。

运行时如果不将libmsg.so的路径加到LD_LIBRARY_PATH,执行时会报错,找不到libmsg.so.(export LD_LIBRARY_PATH=/libmsg.so路径:$LD_LIBRARY_PATH)

结果如下:

进程msgrcv,

进程msgsnd,

 

以上是关于消息队列的主要内容,如果未能解决你的问题,请参考以下文章

消息队列属性及常见消息队列介绍

RabbitMQ 消息队列

Linux进程间通信 --- 消息队列

Redis(五)-特性-消息队列

redis消息队列有没有

到底啥是消息队列?Java中如何实现消息队列