C++ - 如何分块文件以进行同时/异步处理?

Posted

技术标签:

【中文标题】C++ - 如何分块文件以进行同时/异步处理?【英文标题】:C++ - How to chunk a file for simultaneous/async processing? 【发布时间】:2012-11-20 23:46:45 【问题描述】:

如何根据行数读取和拆分/分块文件?

我想将一个文件分成单独的缓冲区,同时确保一行不会在两个或多个缓冲区之间分割。我计划将这些缓冲区传递到它们自己的 pthread 中,以便它们可以执行某种类型的同时/异步处理。

我已阅读reading and writing in chunks on linux using c 下面的答案,但我认为它并不能完全回答有关确保一行不分成两个或更多缓冲区的问题。

【问题讨论】:

这类方案永远不会按预期进行。你有一个多核处理器,这是一种让多线程高效的好方法。但是你仍然只有一个磁盘,线程只是在等待轮到从中读取。 @HansPassant:如果 OP 知道该任务受 CPU 限制,那么最终可能并非如此。但是,是的,您很可能是正确的。尽管pbzipxz 都在块级别上使用了这种技术,效果很好。 其实我并没有考虑到这一点。从那以后,我想到了一种更简洁的方法来解决我的宏观问题,这种方法不涉及使用我提出的问题。不过我还是很好奇答案是什么! 即使一个磁盘只有一个磁盘(例如相对于 raid-0 阵列),一个线程的 IO 也可以与另一个线程的 CPU 处理交错。在过去 50 年左右的时间里,这类计划一直运作良好。即使在单个 CPU 上。 想跟进这个问题。我正在重新探索将文件分块并将其传递给 N 个线程进行异步处理的可能性。如果我要逐行读取文件并将每一行传递给一个线程,它会高效吗?我怀疑这也是文件 IO 绑定的。 【参考方案1】:

文件是如何编码的?如果它每个字节代表一个字符,我会执行以下操作:

    内存映射文件使用mmap()。 根据适当的块大小计算作业的大致起点和终点。 通过查找下一个'\n',让每个作业找到其实际开始和结束。 同时处理各个块。 请注意,第一个块需要特殊处理,因为它的开始不是近似的而是精确的。

【讨论】:

【参考方案2】:

我会选择一个以字节为单位的块大小。然后我会寻找文件中的适当位置并一次读取一些少量字节,直到我得到一个换行符。

第一个块的最后一个字符是换行符。第二个块的第一个字符是换行符之后的字符。

始终寻找 pagesize() 边界并一次读取 pagesize() 字节以搜索换行符。这将倾向于确保您只从磁盘中提取所需的最小值以找到您的边界。您可以尝试一次读取 128 个字节或其他内容。但是你会冒着进行更多系统调用的风险。

我编写了一个示例程序来计算字母频率。当然,这在很大程度上没有意义,因为它几乎肯定是 IO 绑定的。而且换行符的位置也无关紧要,因为它不是面向行的。但是,这只是一个例子。此外,它在很大程度上依赖于您拥有相当完整的 C++11 实现。

threaded_file_split.cpp on lisp.paste.org

他们的关键功能是这样的:

// Find the offset of the next newline given a particular desired offset.
off_t next_linestart(int fd, off_t start)

   using ::std::size_t;
   using ::ssize_t;
   using ::pread;

   const size_t bufsize = 4096;
   char buf[bufsize];

   for (bool found = false; !found;) 
      const ssize_t result = pread(fd, buf, bufsize, start);
      if (result < 0) 
         throw ::std::system_error(errno, ::std::system_category(),
                                   "Read failure trying to find newline.");
       else if (result == 0) 
         // End of file
         found = true;
       else 
         const char * const nl_loc = ::std::find(buf, buf + result, '\n');
         if (nl_loc != (buf + result)) 
            start += ((nl_loc - buf) + 1);
            found = true;
          else 
            start += result;
         
      
   
   return start;

还要注意我使用了pread。当您有多个线程从文件的不同部分读取时,这是绝对必要的。

文件描述符是线程之间的共享资源。当一个线程使用普通函数从文件中读取数据时,它会更改有关此共享资源的详细信息,即文件指针。文件指针是下一次读取在文件中的位置。

在每次阅读之前简单地使用lseek 并没有帮助,因为它会在lseekread 之间引入竞争条件。

pread 函数允许您从文件中的特定位置读取一堆字节。它也根本不会改变文件指针。除了它不会改变文件指针这一事实之外,它就像在同一个调用中组合了 lseekread

【讨论】:

你怎么知道你什么时候换行了? @Ken:您扫描读取的缓冲区以查找换行符。记下缓冲区开头的文件偏移量(您搜索到的位置)和换行符缓冲区内的偏移量。将两者相加,您将获得接近您的块大小的换行符的文件偏移量。 @Ken - 我主要使用我的技术为您编写了一个示例程序。我不会去尝试确保文件访问是页面对齐的。如果您真的想让事情尽可能快,您将确保所有文件访问尽可能页面对齐,并且缓冲区也是页面对齐的。这允许操作系统通过简单地将缓冲区映射到进程的地址空间而不是复制来优化读取。【参考方案3】:

为缓冲区定义一个类。给每个人一个很大的缓冲区空间,它是页面大小和开始/结束索引的倍数,一个从传入的流中读取缓冲区的方法和一个将另一个 *buffer 实例作为参数的“lineParse”方法。

制作一些 *buffers 并将它们存储在生产者-消费者池队列中。打开文件,从池中获取一个缓冲区并从头到尾读入缓冲区空间,(返回一个布尔值表示错误/EOF)。从池中获取另一个 *buffer 并将其传递到前一个的 lineparse() 中。在那里,从数据的末尾向后搜索,寻找 newLine。找到后,重新加载结束索引并 memcpy 最后一行的片段(如果有的话 - 你可能偶尔会很幸运:),进入新的,传递的 *buffer 并设置它的起始索引。第一个缓冲区现在有整行,可以排队到将处理这些行的线程。第二个缓冲区具有从第一个缓冲区复制的行片段,并且可以将更多数据从磁盘读取到其起始索引处的缓冲区空间中。

行处理线程可以将“已使用”*缓冲区回收回池中。

继续直到 EOF,(或错误:)。

如果可以的话,向缓冲区类添加一个方法来处理缓冲区。

使用大型缓冲区类并从末尾解析比不断读取小位,从头开始寻找换行符更有效。线程间通信很慢,可以传递的缓冲区越大越好。

使用缓冲区池消除了持续的新建/删除并提供流控制 - 如果磁盘读取线程比处理速度快,则池将清空并且磁盘读取线程将阻塞它,直到一些使用的缓冲区被回收。这可以防止内存失控。

请注意,如果您使用多个处理线程,则缓冲区可能会“乱序”处理 - 这可能很重要,也可能不重要。

在这种情况下,您只能通过确保与磁盘读取延迟并行处理的行的优势大于线程间通信的开销 - 在线程之间通信小缓冲区很可能会适得其反.

整体速度快但延迟较大的网络磁盘将体验到最大的加速。

【讨论】:

以上是关于C++ - 如何分块文件以进行同时/异步处理?的主要内容,如果未能解决你的问题,请参考以下文章

Clojurescript:使用核心/异步通道分块处理请求

halcon 异步采集如何结束

c++线程异步同时运行

C++ asio 提供线程的异步执行

如何在同一线程上调度异步以进行串行处理

在 C++ 中处理复合异步函数