如何保证 read() 实际上通过命名管道发送 write() 发送的 100% 数据

Posted

技术标签:

【中文标题】如何保证 read() 实际上通过命名管道发送 write() 发送的 100% 数据【英文标题】:How to guarantee read() actually sends 100% of data sent by write() through named pipes 【发布时间】:2009-05-24 23:34:37 【问题描述】:

我有以下两个程序,一个作为阅读器,另一个作为编写器。作者似乎只正确发送了大约 3/4 的数据以供读者阅读。有没有办法保证所有数据都被发送?我想我已经对其进行了设置,使其能够可靠地读取和写入,但它似乎仍然丢失了 1/4 的数据。

这里是作者的来源

#define pipe "/tmp/testPipe"

using namespace std;

queue<string> sproutFeed;


ssize_t r_write(int fd, char *buf, size_t size) 
   char *bufp;
   size_t bytestowrite;
   ssize_t byteswritten;
   size_t totalbytes;

   for (bufp = buf, bytestowrite = size, totalbytes = 0;
        bytestowrite > 0;
        bufp += byteswritten, bytestowrite -= byteswritten) 
      byteswritten = write(fd, bufp, bytestowrite);
            if(errno == EPIPE)
            
            signal(SIGPIPE,SIG_IGN);
            
      if ((byteswritten) == -1 && (errno != EINTR))
         return -1;
      if (byteswritten == -1)
         byteswritten = 0;
      totalbytes += byteswritten;
   
   return totalbytes;



void* sendData(void *thread_arg)


int fd, ret_val, count, numread;
string word;
char bufpipe[5];


ret_val = mkfifo(pipe, 0777); //make the sprout pipe

if (( ret_val == -1) && (errno != EEXIST)) 

    perror("Error creating named pipe");
    exit(1);
   
while(1)

    if(!sproutFeed.empty())
    
        string s;
        s.clear();
        s = sproutFeed.front();
        int sizeOfData = s.length();
        snprintf(bufpipe, 5, "%04d\0", sizeOfData); 
        char stringToSend[strlen(bufpipe) + sizeOfData +1];
        bzero(stringToSend, sizeof(stringToSend));                  
        strncpy(stringToSend,bufpipe, strlen(bufpipe));         
        strncat(stringToSend,s.c_str(),strlen(s.c_str()));
        strncat(stringToSend, "\0", strlen("\0"));                  
        int fullSize = strlen(stringToSend);            
        signal(SIGPIPE,SIG_IGN);

        fd = open(pipe,O_WRONLY);
        int numWrite = r_write(fd, stringToSend, strlen(stringToSend) );
        cout << errno << endl;
        if(errno == EPIPE)
        
        signal(SIGPIPE,SIG_IGN);
        

        if(numWrite != fullSize )
                       
            signal(SIGPIPE,SIG_IGN);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
            close(fd);
        
        else
        
            signal(SIGPIPE,SIG_IGN);
            sproutFeed.pop();
            close(fd);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
                           
    
    else
    
        if(usleep(.0002) == -1)
        
            perror("sleeping error\n");
        
    

int main(int argc, char *argv[])

    signal(SIGPIPE,SIG_IGN);
    int x;
    for(x = 0; x < 100; x++)
    
        sproutFeed.push("All ships in the sea sink except for that blue one over there, that one never sinks. Most likley because it\'s blue and thats the mightiest colour of ship. Interesting huh?");
    
    int rc, i , status;
    pthread_t threads[1];       
    printf("Starting Threads...\n");
    pthread_create(&threads[0], NULL, sendData, NULL);
    rc = pthread_join(threads[0], (void **) &status);


这是读者的来源

#define pipe "/tmp/testPipe"

char dataString[50000];
using namespace std;
char *getSproutItem();

void* readItem(void *thread_arg)

    while(1)
    
        x++;
        char *s = getSproutItem();
        if(s != NULL)
        
            cout << "READ IN: " << s << endl;
        
    



ssize_t r_read(int fd, char *buf, size_t size) 
   ssize_t retval;
   while (retval = read(fd, buf, size), retval == -1 && errno == EINTR) ;
   return retval;



char * getSproutItem()

    cout << "Getting item" << endl;
    char stringSize[4];
    bzero(stringSize, sizeof(stringSize));
    int fd = open(pipe,O_RDONLY);
    cout << "Reading" << endl;

    int numread = r_read(fd,stringSize, sizeof(stringSize));


    if(errno == EPIPE)
    
        signal(SIGPIPE,SIG_IGN);

    
    cout << "Read Complete" << endl;

    if(numread > 1)
    

        stringSize[numread] = '\0'; 
        int length = atoi(stringSize);
        char recievedString[length];
        bzero(recievedString, sizeof(recievedString));
        int numread1 = r_read(fd, recievedString, sizeof(recievedString));
        if(errno == EPIPE)
        


signal(SIGPIPE,SIG_IGN);
           
    if(numread1 > 1)
    
        recievedString[numread1] = '\0';
        cout << "DATA RECIEVED: " << recievedString << endl;
        bzero(dataString, sizeof(dataString));
        strncpy(dataString, recievedString, strlen(recievedString));
        strncat(dataString, "\0", strlen("\0"));
        close(fd);  
        return dataString;
    
    else
    
        return NULL;
    


else

    return NULL;


close(fd);

int main(int argc, char *argv[])

        int rc, i , status;
        pthread_t threads[1];       
        printf("Starting Threads...\n");
        pthread_create(&threads[0], NULL, readItem, NULL);
        rc = pthread_join(threads[0], (void **) &status); 


【问题讨论】:

你没有说是什么症状导致你相信你缺少字符。你的代码太复杂了,没有明显的错误(查一下)。要获得真正的帮助,您需要创建一些足够简单的东西,让错误显而易见——无论是对您还是对这里的其他人。 似乎作者来自java背景,并试图将他的知识盲目地应用于C 哎哟。这对我来说是一次学习的经历,没必要这么苛刻 【参考方案1】:

您肯定以错误的方式使用信号。这里完全不需要线程——至少在提供的代码中是这样。字符串计算很奇怪。获取this book,在阅读完之前不要触摸键盘:)

【讨论】:

我真的很沮丧,才开始添加东西。我最终从命名管道切换到消息队列。使用基于消息的 ipc 而不是基于流,我的问题更容易解决 请注意,SysV IPC 将您限制在一台机器上。我建议学习正确的套接字编程,它在一台机器和整个网络中都能很好地工作。【参考方案2】:

用于通过命名管道发送数据的一般方法是在标头上附加有效负载的长度。然后你读(fd, header_len);读取(rd,data_len);请注意,后者 read() 需要在循环中完成,直到读取 data_len 或 eof。另请注意,如果您有多个写入器到命名管道,那么写入是原子的(只要合理的大小)多个写入者不会对内核缓冲区中的部分消息进行大小写。

【讨论】:

【参考方案3】:

很难说这里发生了什么。也许您的系统调用之一返回了错误?您确定您已成功发送所有数据吗?

这里似乎还有一些无效代码:

    int length = atoi(stringSize);
    char recievedString[length];

这是一个语法错误,因为您不能使用非常量表达式在堆栈上创建数组的大小。也许您在真实版本中使用了不同的代码?

是否需要循环读取数据?有时一个函数会返回一部分可用数据,并要求您重复调用它,直到所有数据都消失。

如果系统调用被中断,Unix 中的一些系统调用也可以返回 EAGAIN - 从表面上看,你并没有处理这种情况。

【讨论】:

“这是一个语法错误”——就标准而言。但是例如 gcc 允许它(除非你是 -pedantic),因为它在 C99 中。无论编译器如何,如果它接受它,那么我希望它能够工作。 多么奇怪。我想知道这在幕后是如何工作的,比如说你是否在堆栈上分配了多个动态大小的数组 我假设使用一种通用/相似的机制来实现动态数组和alloca,尽管两者的结果有不同的范围。我想需要一些花哨的步法来引用可变长度分配“上方”和“下方”的两个变量:要么你不按范围顺序放置它们,要么你不能使用来自单个 sp 的常量偏移量。不过,从来没有调查过。 好的,让我看看我是否明白这一点。因此,正确的方法是 1)首先发送有效载荷的长度(就像正在做的那样)。然后将一个数组设置为像 char recievedString[5] 这样的常量大小,然后读取直到我读到最后或得到一个 EOF?【参考方案4】:

您可能会被阅读器主线程中的 POSIX 线程信号处理语义所困扰。 POSIX 标准允许 POSIX 线程接收信号,而不一定是您期望的线程。在不需要的地方阻止信号。 signal(SIG_PIPE,SIG_IGN) 是你的朋友。向阅读器主添加一个。

POSIX 线程处理语义,将 POS 放入 POSIX。 (但它确实更容易实现 POSIX 线程。)

用 ls 检查 /tmp 中的管道?不是空的吗?

【讨论】:

以上是关于如何保证 read() 实际上通过命名管道发送 write() 发送的 100% 数据的主要内容,如果未能解决你的问题,请参考以下文章

使用命名管道向子进程发送参数

WinAPI 命名管道位置

由命名管道的实现联想到 read 和 fread 的区别。

我通过命名管道传输了一个 std::pair 它是如何工作的?

如何将命名管道c#服务器连接到命名管道php客户端

命名管道如何识别客户端