openmp - 用于文本文件读取和使用管道的 while 循环

Posted

技术标签:

【中文标题】openmp - 用于文本文件读取和使用管道的 while 循环【英文标题】:openmp - while loop for text file reading and using a pipeline 【发布时间】:2013-05-29 11:09:25 【问题描述】:

我发现 openmp 不支持 while 循环(或者至少不太喜欢它们)。 而且也不喜欢 ' != ' 运算符。

我有这段代码。

int count = 1;
#pragma omp parallel for
    while ( fgets(buff, BUFF_SIZE, f) != NULL )
    
        len = strlen(buff);
        int sequence_counter = segment_read(buff,len,count);
        if (sequence_counter == 1)
        
            count_of_reads++;
            printf("\n Total No. of reads: %d \n",count_of_reads);
        
    count++;
    

关于如何管理这个的任何线索?我在某处(包括关于 *** 的另一篇文章)读到我可以使用管道。那是什么 ?以及如何实施?

【问题讨论】:

请提供您阅读该帖子的链接。 @Shahbaz,我想他可能指的是这个 SO 帖子***.com/questions/8121077/… 其实...这个 ..***.com/questions/7532067/… 但这也很重要! 【参考方案1】:

太糟糕了,人们这么快就选择了最佳答案。这是我的答案。 首先,您应该使用 fread 之类的东西将文件读入缓冲区。这非常快。如何做到这一点的一个例子可以在这里找到http://www.cplusplus.com/reference/cstdio/fread/

然后您可以与 OpenMP 并行操作缓冲区。我已经为你实现了大部分。下面是代码。您没有提供 segment_read 函数,所以我创建了一个虚拟函数。我使用了 C++ 中的一些函数,例如 std::vector 和 std::sort ,但只要多做一点工作,你也可以在纯 C 中做到这一点。

编辑: 我编辑了这段代码,并且能够删除排序和关键部分。

我用g++ foo.cpp -o foo -fopenmp -O3编译

#include <stdio.h>
#include <omp.h>
#include <vector>

using namespace std;

int segment_read(char *buff, const int len, const int count) 
  return 1;  


void foo(char* buffer, size_t size) 
    int count_of_reads = 0;
    int count = 1;
    std::vector<int> *posa;
    int nthreads;

    #pragma omp parallel 
    
        nthreads = omp_get_num_threads();
        const int ithread = omp_get_thread_num();
        #pragma omp single 
        
            posa = new vector<int>[nthreads];
            posa[0].push_back(0);
        

        //get the number of lines and end of line position
        #pragma omp for reduction(+: count)
        for(int i=0; i<size; i++) 
            if(buffer[i] == '\n')  //should add EOF as well to be safe
                count++;
                posa[ithread].push_back(i);
            
        

        #pragma omp for     
        for(int i=1; i<count ;i++)     
            const int len = posa[ithread][i] - posa[ithread][i-1];
            char* buff = &buffer[posa[ithread][i-1]];
            const int sequence_counter = segment_read(buff,len,i);
            if (sequence_counter == 1) 
                #pragma omp atomic
                count_of_reads++;
                printf("\n Total No. of reads: %d \n",count_of_reads);
            

        
    
    delete[] posa;


int main () 
  FILE * pFile;
  long lSize;
  char * buffer;
  size_t result;

  pFile = fopen ( "myfile.txt" , "rb" );
  if (pFile==NULL) fputs ("File error",stderr); exit (1);

  // obtain file size:
  fseek (pFile , 0 , SEEK_END);
  lSize = ftell (pFile);
  rewind (pFile);

  // allocate memory to contain the whole file:
  buffer = (char*) malloc (sizeof(char)*lSize);
  if (buffer == NULL) fputs ("Memory error",stderr); exit (2);

  // copy the file into the buffer:
  result = fread (buffer,1,lSize,pFile);
  if (result != lSize) fputs ("Reading error",stderr); exit (3);

  /* the whole file is now loaded in the memory buffer. */
  foo(buffer, result);
  // terminate


  fclose (pFile);
  free (buffer);
  return 0;

【讨论】:

我非常喜欢这个答案。很抱歉快速的“最佳答案”。大多数问题似乎都能尽快解决。 我(再次)编辑了代码,线程上的内部循环是错误的。我一直在努力解决的主要问题是线程随机进入,因此 segment_read 可能不会按顺序调用。这可能不是问题。好消息是有位置的变量是按顺序填充的。换句话说,posa[0] 是最低位置的向量,posa[7](使用 8 个线程)是最高位置的向量。因此,如果您需要按顺序获得这些职位。最初,我使用 sort() 和临界区来执行此操作,但最新的代码不需要。 这主要是我的问题,我希望按顺序读取这些行,但是 segment_read 中的某些子句可能会提前终止该线程。这会有什么影响吗?我的整个想法是整个 segment_read 函数并行运行。我有一台 8 核机器,因此您可以假设 8 个 segment_reads 在文件的 8 个不同行中运行。 行在一个线程中按顺序读取。但是,线程是随机运行的。但只要你保存每个线程的结果(就像我在 posa 中所做的那样),你就可以在最后循环它们并按顺序获取结果。提前终止的线程不会有任何区别,除非您可以进行进一步的优化(例如尝试 schedule(dynamic))。 更换这部分的任何线索? posa = 新向量[nthreads]; posa[0].push_back(0);【参考方案2】:

在 OpenMP 中实现“并行 while”的一种方法是使用创建任务的 while 循环。这是一个大致的草图:

void foo() 
    while( Foo* f = get_next_thing() ) 
#pragma omp task firstprivate(f)
        bar(f);
    
#pragma omp taskwait

对于循环 fgets 的特定情况,请注意 fgets 具有固有的顺序语义(它获取“下”行),因此需要在启动任务之前调用它。每个任务对 fgets 返回的数据的自己的副本进行操作也很重要,这样对 fgets 的调用不会覆盖前一个任务正在操作的缓冲区。

【讨论】:

这些任务可以调用函数吗?即调用函数的任务,在char数组上执行函数,并返回一个值? 是的,任务可以调用函数。您必须小心的是确保没有两个并发任务相互干扰。例如,如果我在示例中忘记写“firstprivate(f)”,那么当 bar(f) 开始实际启动时,f 的值可能已经消失或被下一次迭代覆盖。 f的声明方式,默认为firstprivate,task构造中的firstprivate子句是多余的。【参考方案3】:

首先,尽管它非常接近,但 openmp 并不能神奇地使您的代码并行。它适用于for,因为for 具有它可以理解的下限和上限。 Openmp 使用这些界限在不同线程之间分配工作。

while 循环不可能有这样的事情。

其次,您希望您的任务如何并行化?您正在从文件中读取,其中顺序访问可能会比并行访问提供更好的性能。您可以并行化 segment_read(基于其实现)。

或者,您可能希望将文件读取与处理重叠。为此,您需要使用更底层的函数,例如 Unix 的 openread 函数。然后,进行异步读取,这意味着您发送读取请求,处理最后一个读取块,然后等待读取请求完成。例如,搜索“linux asynchronous io”以了解更多信息。

使用管道实际上可能对您没有多大帮助。这将取决于我不太熟悉的管道的许多内部结构。但是,如果您有足够大的内存,您可能还需要考虑先加载整个数据,然后再进行处理。这样,加载数据会尽可能快(按顺序)完成,然后您可以并行处理。

【讨论】:

以上是关于openmp - 用于文本文件读取和使用管道的 while 循环的主要内容,如果未能解决你的问题,请参考以下文章

读取文件时无法使用 OpenMP 获得加速

管道 gsutil 输出到文件

操作系统之进程通信——管道

DELPHI CreatePipe 方法读取一个DOS窗口返回的数值。

04-Linux文本处理-sed

用于更新文件和签入 TFS 的 Azure DevOps 管道任务