多进程生产者消费者框架设计

Posted 神技圈子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多进程生产者消费者框架设计相关的知识,希望对你有一定的参考价值。

前言

介绍了进程如何基于面向对象的封装,本章我们基于封装好的Process类来实现一种无锁版的生产者和消费者框架,用它实现了高性能文件拷贝功能。读这篇文章之前大家可以想一下如果是你,你会怎么设计这样的框架?在这个模型中作为每个生产者,如何把读取到内容发送给消费者。而作为消费者,如何把接收到的内容写入文件,并且最终保证文件内容是一致的。

好了,废话不多说,我们来通过代码看下是怎么设计的吧?

生产者消费者模型

我们回顾一下什么是生产者消费者模型。这个模型是为了解决在整个程序过程中既要不断产生数据,又要处理数据的一种方案:生产者专注产生数据,消费者专注处理数据。
但如果模型中生产者产生数据很快,消费者处理数据很慢,那么必然会出现生产者要等待消费者处理完以后,才能继续生产。或者反之也是类似,消费者则会等到生产者生产完才去处理。这样两者就形成了依赖关系,无法做到高性能,需要在两者之间加入缓冲的区域,让它们不再有直接相互依赖的情况,如下图所示:

生产者消费者进程

结合上述对于多进程的封装以及生产者消费者的模型,该框架的设计关键点如下:

1.采用多个生产者和消费者都为进程。将Process作为基类,设计Process的继承类生产者进程类(ProducerProcess)和消费者进程类(ConsumerProcess)

2.两者之间的数据缓冲的通道则基于对无名管道的封装设计为一个缓存通道类,让生产者进程和消费者进程共用,并且生产者只往通道里放入数据,消费者只从通道里获取数据。

3.考虑到具体过程是文件处理,需要设计一种数据结构,用于生产者和消费者之间的数据传递。

发送数据结构设计

1.需要发送到管道的数据用Producer结构体来存放,并且通过内存拷贝来传输和接受数据。因为多进程之间指针是很难共享的,这里采用相对简单的方式是通过进程间通信的办法,把指针指向的内容传递过去。

2.结构体内要有一个读/写数据长度的成员变量。因为无法预设要拷贝的文件会有多大,所以不能认为只通过一两次读取文件就能拿到它的所有内容了。那么就得把读取文件的过程设计为每次不断读取固定长度的数据,直到读到文件末尾为止。

具体代码如下:


typedef  struct Producer



    size_t length;

    size_t readoffset;

    char *readbuf;

Producer;

 

typedef struct Consumer



    size_t length;

    size_t writeoffset;

    char *writebuf;

Consumer;

缓冲通道设计

1.如果单次传输数据量大于了管道的大小,就会出问题,因此对于上诉数据传递结构体中的数据长度要小于管道最大存储值(默认是4k)。

2.需要封装管道的读端写端的打开/关闭,判断是否已打开等。

具体代码如下:

class Tunnel



public:

       Tunnel();

       ~Tunnel();

 

       bool isOpen() const;

       void open();

       void closeWriteDescriptor();

       void closeReadDescriptor();

       ssize_t read(char *buf, size_t count);

       ssize_t write(char *buf, size_t count);

 

       inline int getReadDescriptor()

      

          return m_pipeFd[0];

      

 

      inline int getWriteDescriptor()

      

          return m_pipeFd[1];

      

 

private:

    int m_pipeFd[2];

;

读文件设计

这里有个问题,既然每个生产者进程的业务逻辑都是先读了源文件的一部分后再向管道发送读到的数据。那么到下一次读源文件的时候,要偏移多少呢?在这里,多个生产者进程读取文件的设计上为了减少同步机制造成的操作系统开销,采取每个进程从不同初始偏移量开始读取的方式。初始偏移量=indexm_maxlength。index为进程序列号,比如共有5个进程,index为0、1、2、3、4 ,m_maxlength为最大读取长度。到下一次再去读取源文件的时候,偏移量offset又从原有的offset增加了m_maxProducerm_maxlength。其中的m_maxProducer是类ConfigManager的成员变量,从输入参数中获取。

此外,还采用pread接口读取文件,其实际操作是lseek+read,即先将offset调整到指定值,再调用read读取数据。它同read的不同之处在于它是原子操作,即pread操作前后的offset是一致的。

读文件大小超过管道大小遇到的问题

若管道大小不是结构体Producer大小的整数倍,必然会出现生产者进程写入管道的数据大于管道大小。那么对于消费者进程来说,读取的数据大小是固定长度的,那么必然会出现它没有读完,只读取到管道大小数据就直接返回。按照Linux公平调度原则,这个消费者没办法马上读取剩余数据,只能通过read的返回值知道还有多少没有读。当管道再次未满时,未发送完数据的生产者进程从阻塞态转为运行态,把剩余数据继续发送到管道并返回。而其它生产者进程会继续把自己的Producer数据继续写管道。这样又会造成一个问题。消费者按照固定长度读取,必然会把前一个生产者写入管道的剩余数据和后面生产者写入管道的数据一同读取。从而造成了内容错乱问题。怎么办呢?设计中我们把Producer的大小定义为管道大小的1/2。

源码链接

以上是关于多进程生产者消费者框架设计的主要内容,如果未能解决你的问题,请参考以下文章

多进程生产者消费者框架设计

ACE框架 基于共享内存的进程间通讯

在linux下用c语言实现用多进程同步方法演示“生产者-消费者”问题

关于python多进程使用(Queue、生产者和消费者)

百万年薪python之路 -- 并发编程之 多线程 一

python 复习—并发编程实战——多线程和多进程的生产者消费者模型线程进程再总结