多进程编程之进程间通信-管道和消息队列

Posted JasonLiu1919

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多进程编程之进程间通信-管道和消息队列相关的知识,希望对你有一定的参考价值。

1.进程间通信

Linux作为一种新兴的操作系统,几乎支持所有的Unix下常用的进程间通信方法:管道、消息队列、共享内存、信号量、套接口等等。

2.2.1 管道

管道是进程间通信中最古老的方式,它包括无名管道和有名管道两种,前者用于父进程和子进程间的通信,后者用于运行于同一台机器上的任意两个进程间的通信。

无名管道pipe

无名管道由pipe()函数创建:

 #include <unistd.h> 
 int pipe(int filedis[2]); 

参数filedis返回两个文件描述符:filedes[0]为读而打开,filedes[1]为写而打开。filedes[1]的输出是filedes[0]的输入。下面的例子示范了如何在父进程和子进程间实现通信。
跨越fork调用的管道:
例子代码:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

int main()
{
    int data_processed;
    int file_pipes[2];
    const char some_data[] = "123";
    char buffer[BUFSIZ + 1];
    pid_t fork_result;

    memset(buffer, ‘\0‘, sizeof(buffer));

    if (pipe(file_pipes) == 0)
    {
        fork_result = fork();
        if (fork_result == -1) {
            fprintf(stderr, "Fork failure");
            exit(EXIT_FAILURE);
        }

// We‘ve made sure the fork worked, so if fork_result equals zero, we‘re in the child process.

        if (fork_result == 0)
        {
            // sleep(1);//如果父进程在子进程之前退出,则可以在两个输出内容之间看到shell的提示符
            data_processed = read(file_pipes[0], buffer, BUFSIZ);
            printf("Read %d bytes: %s\n", data_processed, buffer);
            exit(EXIT_SUCCESS);
        }

// Otherwise, we must be the parent process.

        else
        {

            data_processed = write(file_pipes[1], some_data,
                                   strlen(some_data));
            printf("Wrote %d bytes\n", data_processed);//不管sleep(1)还是sleep(10)多少,都是先执行父进程???

        }
    }
    exit(EXIT_SUCCESS);
}

运行结果:
技术分享

基于管道的进程间通信:

int main()
{
    int mycount=0;
    ifstream m_InStream("subdata.txt");//读取的任务列表
    int data_processed;
    int file_pipes[3][2];
    string some_data;

    pid_t fork_result[3];
    for (int i = 0; i < 3; ++i)
    {
        if (pipe(file_pipes[i]) == 0)
        {
            fork_result[i] = fork();    
            if (fork_result[i] == 0) 
            {
                close(file_pipes[i][1]);
                char buf[1024] = {0};
                while(true)
                {
                    int readsize = read(file_pipes[i][0], buf, sizeof(buf)+1);
                    buf[readsize]=‘\0‘;//注意这里,可以存在字符之后有其他字符,可以尝试下,注释掉该语句的效果!!
                    // printf("%X,%X,%X\n",buf[readsize],buf[readsize+1],buf[readsize+2]);
                    printf("Read %d byte,data=%s\n",readsize, buf);
                    std::cout << flush;// fflush(stdout);
                    if (strncmp(buf, "exit", 4) == 0)
                    {
                        break;
                    }
                }
                exit(0);        
            }
            else if(fork_result[i] == -1)
            {
                fprintf(stderr, "Fork failure");
                exit(EXIT_FAILURE);
            }
        }
    }

    string oneline;
    int count=0;
    while(getline(m_InStream, oneline,‘\n‘))
    {
        some_data = oneline;
        char bufs[128] = {0};
        strncpy(bufs,some_data.c_str(),some_data.size());
        data_processed = write(file_pipes[count][1], bufs, strlen(bufs));
        printf("Wrote %d bytes,data=%s\n", data_processed,bufs);
        count++;
        if(count>=3)
        {
            count = count -3;           
        }
        sleep(1);
    }
    for(int i=0; i<3; ++i)
    {
        char bufs[128] = {0};
        snprintf(bufs, sizeof(bufs), "exit");
        write(file_pipes[i][1], bufs, strlen(bufs));
        sleep(1);
    }
    int status =0;
    int mpid =0;
    for(int i=0;i<3;i++)
    {
        mpid = wait(&status);
        printf("pid[%d] is exit with status[%d]\n",mpid,status);
    }
    return 0;
}

运行结果如下:
技术分享

popen和pclose

有名管道(也叫做命名管道)

基于有名管道,我们可以实现不相关进程之间的数据交换,而不必有一个共同的祖先进程。有名管道是基于FIFO文件来完成的,有名管道是一种特殊类型的文件,它在系统文件中以文件名的形式存在,行为和上述介绍的无名管道pipe类似。
有名管道可由两种方式创建:命令行方式mknod系统调用和函数mkfifo。下面的两种途径都在当前目录下生成了一个名为myfifo的有名管道:
方式一:mkfifo(“my_fifo”,”rw”);
方式二:mknod my_fifo p ,my_fifo 是指文件名
在程序中可以通过以下两个不同的函数进行调用:

#include <sys/types.h>  
#include <sys/stat.h>  
int mkfifo(const char *pathname, mode_t mode);  
int mknod(const char *pathname, mode_t mode | S_FIFO, (dev_t)0);  

生成了有名管道后,就可以使用一般的文件I/O函数如open、close、read、write等来对它进行操作。
有名管道创建示例代码:

/*创建有名管道*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>

int main()
{
    int res = mkfifo("tmp/my_fifo", 0777);
    if (res == 0)
        printf("FIFO created\n");
    exit(EXIT_SUCCESS);
}

与通过pipe调用创建管道不同,FIFO是以命名文件的形式存在,而不是打开的文件描述符,所以在对它进行读写操作之前必须先打开它。FIFO也用open和close函数进行打开和关闭。传递给open函数的是FIFO的路径名,而不是一个一般下的文件。
下面即是一个简单的例子,假设我们已经创建了一个名为my_fifo的有名管道。
【待补充 】

2.2.2 消息队列

消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。一个有写权限的进程按照一定的规则对消息队列进行信息添加,对消息队列有读权限的进程则可以从消息队列中读走消息,从而实现进程间的通信。。
代码版本1:
涉及函数:
1)创建新消息队列或者取得已经存在的消息队列
int msgget(key_t key, int msgflg);
参数:
key:可以认为是一个端口号,是用来命名某个特定的消息队列,进行消息队列标识。
msgflg:
IPC_CREAT值,若没有该队列,则创建一个并返回新标识符;若已存在,则返回原标识符。
IPC_EXCL值,若没有该队列,则返回-1;若已存在,则返回0。
2)向队列读/写消息
原型:
int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数:
msqid:消息队列的标识码
msgp:指向消息缓冲区的指针,此位置用来暂时存储发送和接收的消息,是一个用户可定义的通用结构,形态如下:

         struct msgstru 
         { long mtype; /* 消息类型,必须 > 0 */ 
           char mtext[1024]; /* 消息文本 */ 
         };

msgsz:消息的大小。
msgtyp:从消息队列内读取的消息形态。如果值为零,则表示消息队列中的所有消息都会被读取。
msgflg:用以控制当队列中没有对应类型的消息可以接收时的处理逻辑。对于msgsnd函数,msgflg控制着当前消息队列满或消息队列到达系统范围的限制时将要发生的事情。如果msgflg设置为IPC_NOWAIT,则在msgsnd()执行时若是消息队列已满,则msgsnd()将不会阻塞,不会发送信息,立即返回-1。如果执行的是msgrcv(),则在消息队列呈空时,不做等待马上返回-1,并设定错误码为ENOMSG。当msgflg为0时,msgsnd()及msgrcv()在队列呈满或呈空的情形时,采取阻塞等待的处理模式。此时对于发送进程而已发送进程将挂起以等待队列中腾出可用的空间;对于接收进程而言,该进程将会挂起以等待一条相应类型的信息到达。
3)设置消息队列属性
原型:int msgctl ( int msgqid, int cmd, struct msqid_ds *buf );
参数:msgctl 系统调用对 msgqid 标识的消息队列执行 cmd 操作,系统定义了 3 种 cmd 操作: IPC_STAT , IPC_SET , IPC_RMID
IPC_STAT : 该命令用来获取消息队列对应的 msqid_ds 数据结构,并将其保存到 buf 指定的地址空间。
IPC_SET : 该命令用来设置消息队列的属性,要设置的属性存储在buf中。
IPC_RMID : 从内核中删除 msqid 标识的消息队列。
receive代码:

/*receive.cpp */
#include<iostream>
#include <fstream>
#include <string>
#include <memory.h>
#include<stdio.h>
#include<map>
#include<vector>
#include<algorithm>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h> 
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

using namespace std;
#define MSGKEY 1024
#define ChildNum 5
struct msgstru
{
    long msgtype;
    char msgtext[2048];
};

/*子进程,监听消息队列*/
void childproc()
{
    struct msgstru msgs;
    int msgid,ret_value;
    char str[512];
    /* First, we set up the message queue. */
    // msgid = msgget((key_t)MSGKEY, 0666 | IPC_CREAT);//该键值则唯一对应一个消息队列
    while(1)
    {
        msgid = msgget(MSGKEY,IPC_EXCL );/*检查消息队列是否存在 */
        if(msgid < 0){
            printf("msq not existed! errno=%d [%s]\n",errno,strerror(errno));
            exit(EXIT_FAILURE);
        }
        /*接收消息队列*/
        ret_value = msgrcv(msgid,&msgs,sizeof(struct msgstru),0,0);
        int len = strlen(msgs.msgtext);
        // std::cout<<len<<std::endl;
        msgs.msgtext[len] = ‘\0‘;
        // std::cout<<"pid="<<getpid()<<","<<msgs.msgtext<<std::endl;
        if(ret_value == -1)
        {
            fprintf(stderr, "msgrcv failed with error: %d\n", errno);//消息队列中的信息被取完??
            exit(EXIT_FAILURE);//消息队列为空的时候,就跳出。也可以设计成,消息队列为空时,不跳出,而是等待。
        }
        else
        {
            printf("pid=%d,data=%s\n",getpid(),msgs.msgtext);           
        }
        if (strncmp(msgs.msgtext, "end", 3) == 0)
        {
            exit(EXIT_SUCCESS);//换成break的效果呢???是不一样的啊
        }
        //因为在send的时候,只send了一个end,当该标志信息被读取之后,其他的进程自然是读取不到信息的,

    }
    return;
}

int main()
{
    int i,cpid;

    /* create 5 child process */
    for (i=0;i<ChildNum;i++){
        cpid = fork();
        if (cpid < 0)
            printf("fork failed\n");
        else if (cpid ==0) /*child process*/
            childproc();
    }
    int status =0;
    int mpid =0;
    // std::cout<<"father pid="<<getpid()<<std::endl;
    for(int i=0;i<ChildNum;i++)
    {
        mpid = wait(&status);
        printf("pid[%d] is exit with status[%d]\n",mpid,status);
    }
    return 1;
}

send代码:

/*send.c*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

using namespace std;
#define MSGKEY 1024

struct msgstru
{
   long msgtype;
   char msgtext[2048]; 
};

main()
{
    struct msgstru msgs;
    int msg_type;
    char str[256];
    int ret_value;
    int msqid;

    msqid=msgget(MSGKEY,IPC_EXCL );  /*检查消息队列是否存在*/
    if(msqid < 0){
        msqid = msgget(MSGKEY,IPC_CREAT|0666);/*创建消息队列*/
        if(msqid <0){
            printf("failed to create msq | errno=%d [%s]\n",errno,strerror(errno));
            exit(-1);
        }
    } 
    ifstream m_InStream("subdata.txt");
    string oneline;
    int running=1;
    // while(running)//getline(m_InStream, oneline,‘\n‘) 
    int linenun=0;
    while (running)
    {
        if(!getline(m_InStream, oneline,‘\n‘))
        {
            oneline = "end";
            running = 0;
        }
        linenun++;
        // printf("input message type[0=end process]:");
        // scanf("%d",&msg_type);
        // if (msg_type == 0)
            // break;
        // printf("input message to be sent:");
        // scanf ("%s",str);
        strncpy(str,oneline.c_str(),256);
        msgs.msgtype = linenun;//这里只是顺便记录下,该字段的值,可以写
        strcpy(msgs.msgtext, str);
        /* 发送消息队列 */
        std::cout<<"pid="<<getpid()<<","<<msgs.msgtext<<std::endl;
        ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstru),0);//消息队列标识符,准备发现信息的指针,信息的长度,控制标志位
        sleep(1);
        if ( ret_value < 0 ) {
            printf("msgsnd() write msg failed,errno=%d[%s]\n",errno,strerror(errno));
            exit(-1);
        }
    }
    msgctl(msqid,IPC_RMID,0); //删除消息队列
    //如果这行代码注释掉的话,则receive进程不会在接收一次send发出的任务之后,就全部子进程全部退出,而是send发送一次(即启动一次send程序)退出一个进程,这里启动了5个进程,所以,需要启动5次send程序,receive程序才会退出(每次退出一个进程)。
}

运行结果如下:
send的结果:
技术分享
receive的结果:
技术分享
对于receive之后的结果,由于在getline的时候,在最后一行之后,send程序中往消息队列中写入的是一个end字符,而在receive方,只有其中一个进程获取到end字符,然后,正常退出该进程,而其他的进程,则是由于消息队列中信息为空,获取消息队列失败而退出的。所以,在结果图中可以看到有4个进程的消息队列获取结果失败,43提示符,从而退出进程,256的提示符。
查看消息队列的命令行:
查看全部ipc对象信息:
#ipcs -a
技术分享
查看消息队列信息
#ipcs -q
查看共享内存信息
#ipcs -m
查看信号量信息
#ipcs -s
删除IPC对象的ipcrm
ipcrm -[smq] ID 或者ipcrm -[SMQ] Key
-q -Q删除消息队列信息 例如ipcrm -q 98307
-m -M删除共享内存信息
-s -S删除信号量信息
如果把两个程序合并到一起,写成一个程序呢?
至于共享内存,信号量和套接字在下文再继续介绍。



























































以上是关于多进程编程之进程间通信-管道和消息队列的主要内容,如果未能解决你的问题,请参考以下文章

Python 3 并发编程多进程之队列(推荐使用)

Python3 异步编程之进程与线程-1

Linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存

进程间通信之消息队列通信

Linux进程间通信——使用消息队列

linux进程间通信之消息队列