父进程和外部子进程之间真正的非阻塞双向通信

Posted

技术标签:

【中文标题】父进程和外部子进程之间真正的非阻塞双向通信【英文标题】:True non-blocking two-way communication between parent and external child process 【发布时间】:2011-07-04 15:44:59 【问题描述】:

我已经阅读了大约 50 篇关于这个主题的帖子和教程,我已经复制、编写和测试了大约 20 种替代方案,并完成了我能想到的所有可能的研究。不过,我还没有看到以下问题的有效解决方案:

父进程A想要将数据传递给外部进程B,让进程B修改数据并将其传递回父进程A,然后继续父进程A。进程B是我拥有的外部程序套件的一部分没有影响,通常在 UNIX 命令行上这样运行:

< input_data program_B1 | program_B2 | program_B3 > output_data

...在哪里

input_data, output_data:在程序B1-B3中处理的一些数据

program_B1,B2,B3:从 stdin (fread) 读取数据并输出到 stdout (fwrite) 并对数据进行一些处理的程序。

所以,按顺序:

(1)父进程A向子进程B传递数据

(2)子进程B读取数据并修改

(3)子进程B将数据传回父进程A

(4) 父进程 A 读取数据并继续(例如将其进一步传递给进程 B2..)。

(5)父进程A将另一个数据集传递给子进程B等

问题是,无论我做什么,程序几乎总是挂在对管道的读/读(或写/写?)上

需要注意的重要一点是,父进程在将数据传递给子进程后不能简单地关闭管道,因为它在循环中工作,并且希望在完成处理后将另一组数据传递给子进程第一组。

这是一组父/子程序(使用 g++ pipe_parent.cc -o pipe_parent、g++ pipe_child.cc -o pipe_child 编译)说明未命名管道的问题。我也尝试过命名管道,但没有那么广泛。每次执行的结果可能略有不同。如果 sleep 语句在父级中省略,或 fflush() 语句在子级中省略,管道几乎肯定会阻塞。如果要传递的数据量增加,它总是会阻塞,独立于 sleep 或 fflush。

父程序 A:

#include <cstring>
#include <cstdio>
#include <cstdlib>

extern "C" 
  #include <unistd.h>
  #include <fcntl.h>
 

using namespace std;

/*
 * Parent-child inter-communication
 * Child is external process
 */

int main() 
  int fd[2];
  if( pipe(fd) == -1 ) 
    fprintf(stderr,"Unable to create pipe\n");
  
  int fd_parentWrite = fd[1];
  int fd_childRead   = fd[0];
  if( pipe(fd) == -1 ) 
    fprintf(stderr,"Unable to create pipe\n");
    exit(-1);
  
  int fd_childWrite = fd[1];
  int fd_parentRead = fd[0];

  pid_t pid = fork();
  if( pid == -1 ) 
    fprintf(stderr,"Unable to fork new process\n");
    exit(-1);
  

  if( pid == 0 )  // Child process
    dup2( fd_childRead,  fileno(stdin)  );  // Redirect standard input(0) to child 'read pipe'
        dup2( fd_childWrite, fileno(stdout) );  // Redirect standard output(1) to child 'write pipe'

    close(fd_parentRead);
    close(fd_parentWrite);
    close(fd_childRead);
    close(fd_childWrite);
    // execl replaces child process with an external one
    int ret = execl("/disk/sources/pipe_test/pipe_child","pipe_child",NULL);
    fprintf(stderr,"External process failed, return code: %d...\n", ret);
    exit(-1);
    // Child process is done. Will not continue from here on
  
  else  // Parent process
    // Nothing to set up
  

  // ...more code...

  if( pid > 0 )  // Parent process (redundant if statement)
    int numElements = 10000;
    int totalSize = numElements * sizeof(float);
    float* buffer = new float[numElements];
    for( int i = 0; i < numElements; i++ ) 
      buffer[i] = (float)i;
    

    for( int iter = 0; iter < 5; iter++ ) 
      fprintf(stderr,"--------- Iteration #%d -----------\n", iter);
      int sizeWrite = (int)write( fd_parentWrite, buffer, totalSize );
      if( sizeWrite == -1 ) 
        fprintf(stderr,"Parent process write error\n");
        exit(-1);
      
      fprintf(stderr,"Parent #%d: Wrote %d elements. Total size: %d\n", iter, sizeWrite, totalSize);
      sleep(1);   // <--- CHANGE!
      int sizeRead = (int)read( fd_parentRead, buffer, totalSize );
      if( sizeRead <= 0 ) 
        fprintf(stderr,"Parent process read error\n");
      
      while( sizeRead < totalSize ) 
        fprintf(stderr,"Parent #%d: Read %d elements, continue reading...\n", iter, sizeRead);
        int sizeNew = (int)read( fd_parentRead, &buffer[sizeRead], totalSize-sizeRead );
        fprintf(stderr," ...newly read %d elements\n", sizeNew);
        if( sizeNew < 0 ) 
          exit(-1);
        
        sizeRead += sizeNew;
      
      fprintf(stderr,"Parent #%d: Read %d elements. Total size: %d\n", iter, sizeRead, totalSize);
      fprintf(stderr,"Examples :  %f  %f  %f\n", buffer[0], buffer[10], buffer[100]);
    

    delete [] buffer;
  

  close(fd_parentRead);
  close(fd_parentWrite);
  close(fd_childRead);
  close(fd_childWrite);

  return 0;

儿童计划 B:

#include <cstdio>

using namespace std;

int main() 

  int numElements = 10000;
  int totalSize = numElements * sizeof(float);
  float* buffer = new float[numElements];

  int counter = 0;
  int sizeRead = 0;
  do 
    sizeRead = fread( buffer, 1, totalSize, stdin);
    fprintf(stderr,"Child  #%d: Read %d elements, buffer100: %f\n", counter, sizeRead, buffer[100]);
    if( sizeRead > 0 ) 
      for( int i = 0; i < numElements; i++ ) 
        buffer[i] += numElements;
      
      int sizeWrite = fwrite( buffer, 1, totalSize, stdout);
      fflush(stdout);  // <--- CHANGE!

      fprintf(stderr,"Child  #%d: Wrote %d elements\n", counter, sizeWrite);
      counter += 1;
    
   while( sizeRead > 0 );

  return 0;

有什么方法可以检查管道何时有足够的数据可供读取?或者有没有其他方法可以解决上述问题,有或没有管道?

请帮忙!

【问题讨论】:

您确定process B(您无法控制的那个)支持这种操作模式吗?许多程序的编写假设它应该在标准输入关闭之前读取,同时缓冲输出(通过写入缓冲的 FILE* 而不是 fflush 来显式或隐式地写入)。这很容易导致死锁,例如您看到的死锁,因为在程序终止/标准输入关闭之前,程序不会输出其最后的数据块。 @nos,我相信这对我来说应该不是问题,除非 fwrite 到 stdout 被系统缓冲。我可以看到program B 的源代码,它所做的只是使用来自标准输入的 fread 和 fwrite 到标准输出。但它不会刷新,我无法对代码进行此类更新。 fwrite 到 stdout 通常是缓冲的,在大多数系统上,您必须明确关闭缓冲(或确保在适当的时间调用 fflush()) 【参考方案1】:

阅读时最好的解决方案可能是检查select是否可以从管道中读取。您甚至可以通过超时。另一种方法可能是使用fcntl 在文件描述符0 (stdin) 上设置O_NONBLOCK 标志,尽管我认为select 方式更好。

与确保非阻塞写入一样:这有点困难,因为您不知道在管道阻塞之前可以写多少。一种方法(我觉得非常难看)是只写 1 个字节的块,然后再次检查 select 是否可以写。但这会影响性能,因此请仅在通信性能不成问题时使用。

【讨论】:

非常感谢。我没有听说过(或忽略了)select 函数,所以我尝试了你的建议。 ..添加到我之前的评论:我在fread 之前拨打了select 电话,并按照link 的示例再次拨打了FD_ISSET。有可能让它工作,但我已经看到当select 返回 0 时,我只是将问题推迟到下一次迭代的写入操作。我给孩子的示例代码可能不是应该的 100%,但很接近。我不确定我是否能够保证即使使用复杂的逻辑也不会挂在 fwrite/fread 上。也许我需要制作一个中间磁盘文件而不是使用管道..【参考方案2】:

第一个答案(使用select 确定管道是否已准备好从中读取)很好,但并没有真正解决我的问题,另请参阅我以前的 cmets。迟早我总是以“竞争条件”结束,程序一直挂在readwrite上。

解决方案(可能不是唯一的?)是在不同的线程中运行子到父数据传输。我还回去将管道实现为命名管道。它可能也适用于未命名的管道,但我没有检查。

最终代码如下。请注意,不需要显式刷新;父子到子和子到父的数据传输现在是分离的。欢迎任何 cmets 如何改进这一点!我可以看到的一个遗留问题是管道可能会填满,具体取决于孩子需要多长时间来处理数据。我不确定这种情况发生的可能性有多大。顺便说一句,这在我的外部程序中运行良好,而不仅仅是提供的子程序。

父程序 A:

#include <cstring>
#include <cstdio>
#include <cstdlib>
#include <string>
#include <iostream>

extern "C" 
  #include <unistd.h>
  #include <fcntl.h>
  #include <sys/stat.h>
  #include <sys/types.h>
  #include <errno.h>
  #include <signal.h>
  #include <sys/wait.h>
  #include <pthread.h>


using namespace std;

static int const READING  = -1;
static int const BUFFER_READY = 1;
static int const FINISHED = 0;

/*
 * Parent-child inter-communication
 * Child is external process
 */

struct threadStruct 
  FILE*  file_c2p;
  int    sizeBuffer;
  float* buffer;
  int    io_flag;
;

// Custom sleep function
void mini_sleep( int millisec ) 
  struct timespec req=0,rem=0;
  time_t sec = (int)(millisec/1000);
  millisec    = (int)(millisec-(sec*1000));
  req.tv_sec  = sec;
  req.tv_nsec = millisec*1000000L;
  nanosleep(&req,&rem);


// Function to be executed within separate thread: Reads in data from file pointer
// Hand-shaking with main thread is done via the flag 'io_flag'
void *threadFunction( void *arg ) 
  threadStruct* ptr = (threadStruct*)arg;

  ptr->io_flag = READING;
  while( ptr->io_flag != FINISHED ) 
    if( ptr->io_flag == READING ) 
      int sizeRead = fread( ptr->buffer, 1, ptr->sizeBuffer, ptr->file_c2p );
      if( sizeRead <= 0 ) 
        ptr->io_flag = FINISHED;
        return NULL;
      
      ptr->io_flag = BUFFER_READY;
    
    else 
      mini_sleep(10);
    
  
  return NULL;


//--------------------------------------------------
int main() 
  std::string filename_p2c("/tmp/fifo11_p2c");
  std::string filename_c2p("/tmp/fifo11_c2p");

  fprintf(stderr,"..started\n");

  int status = mknod(filename_p2c.c_str(), S_IRUSR | S_IWUSR | S_IFIFO, 0);
  if( (status == -1) && (errno != EEXIST) ) 
    fprintf(stderr,"Error creating named pipe: %s\n", strerror(errno));
    exit(-1);
  
  status = mknod(filename_c2p.c_str(), S_IRUSR | S_IWUSR | S_IFIFO, 0);
  if( (status == -1) && (errno != EEXIST) ) 
    fprintf(stderr,"Error creating named pipe: %s\n", strerror(errno));
    exit(-1);
  

  FILE* file_dump = fopen("parent_dump","w");

  int fd_p2c;
  int fd_c2p;
  FILE* file_c2p = NULL;

  //--------------------------------------------------
  // Set up parent/child processes
  //
  pid_t pid = fork();
  if( pid == -1 ) 
    fprintf(stderr,"Unable to fork new process\n");
  

  if( pid == 0 )  // Child process
    fd_p2c = open( filename_p2c.c_str(), O_RDONLY );
    if( fd_p2c < 0 ) 
      fprintf(stderr,"Child: Error opening the named pipe: %d %d '%s'\n", fd_p2c, errno, strerror(errno));
      exit(-1);
    
    fd_c2p = open( filename_c2p.c_str(), O_WRONLY );
    if( fd_c2p < 0 ) 
      fprintf(stderr,"Child: Error opening the named pipe: %d %d '%s'\n", fd_c2p, errno, strerror(errno));
      exit(-1);
    

    dup2(fd_p2c,fileno(stdin));    // Redirect standard input(0) to child 'read pipe'
    dup2(fd_c2p,fileno(stdout));  // Redirect standard output(1) to child 'write pipe'
    close(fd_p2c);
    close(fd_c2p);

    int ret = execl("/disk/sources/pipe_test/pipe_child","pipe_child",NULL);
    fprintf(stderr,"External process failed, return code: %d...\n", ret);
    kill( getppid(), 9 );  // Kill parent process
    exit(-1);
  
  else  // Parent process
    fd_p2c = open( filename_p2c.c_str(), O_WRONLY );
    if( fd_p2c < 0 ) 
      fprintf(stderr,"Parent: Error opening the named pipe: %d %d '%s'\n", fd_p2c, errno, strerror(errno));
      exit(-1);
    
    file_c2p = fopen( filename_c2p.c_str(), "r");
    fd_c2p = fileno( file_c2p );
    if( fd_c2p < 0 ) 
      fprintf(stderr,"Parent: Error opening the named pipe: %d %d '%s'\n", fd_c2p, errno, strerror(errno));
      exit(-1);
    
  

  int numElements = 10000;
  int sizeBuffer = numElements * sizeof(float);
  float* bufferIn  = new float[numElements];
  float* bufferOut = new float[numElements];
  for( int i = 0; i < numElements; i++ ) 
    bufferIn[i]  = 0.0;
  
  int numIterations = 5;
  int numBytesAll = numElements * sizeof(float) * numIterations;

  pthread_t thread;
  threadStruct* threadParam = new threadStruct();
  threadParam->file_c2p   = file_c2p;
  threadParam->sizeBuffer = sizeBuffer;
  threadParam->buffer     = bufferIn;
  threadParam->io_flag    = READING;

  int thread_stat = pthread_create( &thread, NULL, threadFunction, threadParam );
  if( thread_stat < 0 ) 
    fprintf(stderr,"Error when creating thread\n");
    exit(-1);
  

  int readCounter  = 0;
  int numBytesWrite = 0;
  int numBytesRead  = 0;
  for( int iter = 0; iter < numIterations; iter++ ) 
    for( int i = 0; i < numElements; i++ ) 
      bufferOut[i] = (float)i + iter*numElements*10;
    

    int sizeWrite = (int)write( fd_p2c, bufferOut, sizeBuffer );
    if( sizeWrite == -1 ) 
      fprintf(stderr,"Parent process write error\n");
      exit(-1);
    
    numBytesWrite += sizeWrite;
    fprintf(file_dump,"Parent #%d: Wrote %d/%d bytes.\n", iter, numBytesWrite, numBytesAll);

    if( iter == numIterations-1 ) close(fd_p2c);  // Closing output pipe makes sure child receives EOF

    if( threadParam->io_flag != READING ) 
          numBytesRead += sizeBuffer;
      fprintf(file_dump,"Parent #%d: Read  %d/%d bytes. Examples: %f %f\n",
              readCounter, numBytesRead, numBytesAll, bufferIn[1], bufferIn[numElements-1] );
      readCounter += 1;
      if( threadParam->io_flag != FINISHED ) threadParam->io_flag = READING;
    
  
  //********************************************************************************
  //

  fprintf(file_dump,"------------------------------\n");

  while( threadParam->io_flag != FINISHED ) 
    if( threadParam->io_flag == BUFFER_READY ) 
      numBytesRead += sizeBuffer;
      fprintf(file_dump,"Parent #%d: Read  %d/%d bytes. Examples: %f %f\n",
              readCounter, numBytesRead, numBytesAll, bufferIn[1], bufferIn[numElements-1] );
      readCounter += 1;
      if( threadParam->io_flag != FINISHED ) threadParam->io_flag = READING;
    
    else 
      mini_sleep(10);
    
  

  // wait for thread to finish before continuing
  pthread_join( thread, NULL );


  fclose(file_dump);
  fclose(file_c2p);
  waitpid(pid, &status, 0); // clean up any children
  fprintf(stderr,"..finished\n");

  delete [] bufferIn;
  delete [] bufferOut;

  return 0;

儿童计划 B:

#include <cstdio>

using namespace std;

int main() 

  int numElements = 10000;
  int totalSize = numElements * sizeof(float);
  float* buffer = new float[numElements];

  FILE* file_dump = fopen("child_dump","w");

  int counter = 0;
  int sizeRead = 0;
  do 
    sizeRead = fread( buffer, 1, totalSize, stdin);
    if( sizeRead > 0 ) 
      fprintf(file_dump,"Child  #%d: Read  %d bytes, examples:  %f  %f\n", counter, sizeRead, buffer[1], buffer[numElements-1]);
      for( int i = 0; i < numElements; i++ ) 
        buffer[i] += numElements;
      
      int sizeWrite = fwrite( buffer, 1, totalSize, stdout);
      fprintf(file_dump,"Child  #%d: Wrote %d bytes, examples:  %f  %f\n", counter, sizeRead, buffer[1], buffer[numElements-1]);
      counter += 1;
    
   while( sizeRead > 0 );
  fprintf(file_dump,"Child is finished\n");
  fclose(file_dump);
  fclose(stdout);

  return 0;

【讨论】:

以上是关于父进程和外部子进程之间真正的非阻塞双向通信的主要内容,如果未能解决你的问题,请参考以下文章

进程间通信

在 C 中使用管道在父子之间创建双向通信

使用 socketpair 进行双向通信:挂起子进程的读取输出

C linux中子进程与父进程之间的通信:父进程不阻塞

如何在不同进程中建立Activity和Service之间的双向通信?

Qt使用QProcess进程间双向通信(linux和win系统)