C++ 中的高效循环缓冲区,将传递给 C 风格的数组函数参数

Posted

技术标签:

【中文标题】C++ 中的高效循环缓冲区,将传递给 C 风格的数组函数参数【英文标题】:Efficient circular buffer in C++ which will be passed to C-style array function parameter 【发布时间】:2020-11-03 15:30:02 【问题描述】:

我正在寻求有关我解决以下问题的方法的建议。我有一个恒定的数据输入,需要添加到我的缓冲区中,并且在每次迭代时,我需要将缓冲的数据传递给通过指针接受 C 样式数组的函数。

我担心效率,所以我思考如何在某种循环缓冲区中存储和管理数据,同时将其作为顺序原始数据传递给所述函数。

我目前的做法可以总结为以下示例:

#include <iostream>
#include <array>
#include <algorithm>

void foo(double* arr, int size)

  for (uint k = 0; k < size; k++)
    std::cout << arr[k] << ", ";

  std::cout << std::endl;


int main()

  const int size = 20;
  std::array<double, size> buffer;

  for (double data = 0.0; data < 50.0; data += 1.0)
  
      std::move(std::next(std::begin(buffer)), std::end(buffer), std::begin(buffer));
      buffer.back() = data;

      foo(buffer.data(), size);
  

在实际用例中,缓冲区还需要在开始时填充为“const”大小的数据(我在这里使用引号是因为在编译时大小可能或可能不知道,但一旦它是知道,它永远不会改变)。

我将数据存储在std::array(或std::vector,如果在编译时不知道大小),因为数据在内存中是连续的。当我需要插入新数据时,我使用 forward std::move 来移动所有内容,然后我手动替换最后一项。最后,我只是将std::array::data() 及其大小传递给函数。

虽然乍一看这应该很有效,但原因告诉我,因为数据是顺序存储的,整个缓冲区仍然会用std::move复制,每次插入都是O(n)

实际缓冲区大小可能只有数百个,并且数据最大达到 100Hz,但问题是我需要尽快调用函数的结果,所以我不想在缓冲区管理上浪费时间(即使我们说的很少,甚至少于毫秒)。我对此有很多疑问,但他们的候选名单如下​​:

我的做法是不是太天真了? 我对 O(n) 的推理是否正确? 这种方法还有其他缺陷吗? 您对我应该研究的其他方法有什么建议吗?

【问题讨论】:

【参考方案1】:

感谢您的回答维尔纳。当我在 Repl.it 上运行这个解决方案时,我得到:

it took an average of 21us and a max of 57382us

为了比较,我原来的想法在相同的缓冲区大小下,结果如下:

it took an average of 19us and a max of 54129us

这意味着我最初的做法确实很幼稚:)

与此同时,在等待答案的同时,我想出了以下解决方案:

#include <iostream>
#include <array>
#include <algorithm>
#include <chrono>

void foo(double* arr, int size)

  for (uint k = 0; k < size; k++)
    std::cout << arr[k] << ", ";

  std::cout << std::endl;


int main()

  const int buffer_size = 20;
  std::array<double, buffer_size*2> buffer;
  int buffer_idx = buffer_size;

  for (double data = 0.0; data < 100.0; data += 1.0)
  
    buffer.at(buffer_idx - buffer_size) = data;
    buffer.at(buffer_idx++) = data;

    foo(buffer.data() + buffer_idx - buffer_size, buffer_size);

    buffer_idx -= buffer_size * (buffer_idx == buffer_size * 2);
  

由于缓冲区的大小不是问题,我分配了两倍所需的内存并在两个位置插入数据,偏移缓冲区大小。当我到达终点时,我就像打字机一样回去。这个想法是我通过存储一个数据副本来伪造循环缓冲区,这样它就可以读取数据,就好像它穿过了一个完整的圆圈。

对于 50000 的缓冲区大小,这给了我以下结果,这正是我想要的:

it took an average of 0us and a max of 23us

【讨论】:

如果您真的想要性能,您应该将 te .at 替换为 [ ] 运算符。见cplusplus.com/reference/vector/vector/at。在我对您的代码的测量中,它提供了 20% 到 40% 的加速。 这听起来像下面提到的虚拟 ringbuffer G. Sliepen ;) 很好,我学到了一些东西【参考方案2】:

您总是需要复制您的数据,因为不存在“连续”环形缓冲区(也许在某些花哨的芯片中存在)。

您也不能初始化运行时定义大小的数组模板。

您可以使用向量来实现:

#include <iostream>
#include <chrono>
#include <deque>
#include <vector>

int main() 

    std::vector<double> v;

    // pre fill it a little
    for(double data = 0.0; data > -50000.0; data -= 1.0) 
        v.push_back(data);
    

    size_t cnt = 0;
    int duration = 0;
    int max = 0;

    for(double data = 0.0; data < 50000.0; data += 1.0, ++cnt) 

        auto t1 = std::chrono::high_resolution_clock::now();

        v.push_back(data);
        v.erase(v.begin());

        // foo(v.data(), v.size());

        auto t2 = std::chrono::high_resolution_clock::now();
        auto delta = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
        duration += delta;

        if(max == 0 || max < delta) 
            max = delta;
        

    

    std::cout << "it took an average of " << duration / cnt << "us and a max of " << max << " us" << std::endl;

    return 0;

输出:

it took an average of 11us and a max of 245 us

【讨论】:

事实证明,只要缓冲区的大小是页面大小的倍数,大多数计算机确实具有实现连续环形缓冲区所需的高级芯片。例如:vrb.sourceforge.net 我最终实现的确切方法【参考方案3】:

除了the answer by stribor14,我还有另外两个建议。这些仅基于性能,因此此处不会真正找到可读或可维护的代码。

我在阅读问题时的第一个想法也是分配两倍的存储量,但只写入一次。写完所有位置后,后半部分将被复制到前半部分。我的第一直觉说这可能是一个更好的表现。我的理由是,将发生相同数量的总写入,但所有写入都是顺序的(而不是每隔一秒写入一次跳转到数组中的另一个位置)。

#include <cstddef>
#include <cstring>
#include <array>

const size_t buffer_size = 50'000;

int main()

    std::array<double, 2 * buffer_size> buffer;
    double *index = buffer.data();
    double *mid = index + buffer_size;

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    
        if (index == mid)
        
            index = buffer.data();
            std::memcpy(index, mid, buffer_size * sizeof(double));
        

        *(index++ + buffer_size) = data;

        foo(index, buffer_size);
    

另外,我认为可以优化 OP 自己的答案以删除数组访问。这个想法是 buffer[buffer_idx - buffer_size] 需要 2 个加法来计算该值的位置,即:*(buffer + buffer_idx - buffer_size)。如果buffer_idx 包含一个指针,则只需要添加一个。这给出了以下代码:

#include <cstddef>
#include <array>

const size_t buffer_size = 50'000;

int main()

    std::array<double, buffer_size * 2> buffer;
    double *index = buffer.data();
    double *mid = buffer.data() + buffer_size;

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    
        *index = data;
        *(index + buffer_size) = data;
        ++index;

        index -= buffer_size * (index == mid);

        foo(index, buffer_size);
    

现在我注意到我正在走 C++ 优化的兔子洞。所以我们不能止步于此。为了选择使用哪个实现,我想运行一个基准测试。 Werner Pirkl 给了一个good starting point。但是在我们优化的代码上运行它是没有意义的,因为测量的时间是 0μs。所以让我们稍微改变一下,我在基准测试中写了一个循环来给它一些运行时间并想出了:

const int repeats = 1000;
volatile double *ptr;
int duration = 0;
const size_t buffer_size = 50'000;

// ... Set up of the buffers and indices

for (int i = 0; i < repeats; ++i)

    auto t1 = std::chrono::high_resolution_clock::now();

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    
        // ... add data to circular buffer

        ptr = // ... the start of the array
    

    auto t2 = std::chrono::high_resolution_clock::now();
    duration += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();

(注意使用volatile double * 以确保不会优化指向连续数组的原始指针。)

在运行这些测试时,我注意到它们非常依赖于编译器标志 (-O2 -O3 -march=native ...)。我将给出一些结果,但就像所有 C++ 基准一样,对它持保留态度,并在真实世界的工作负载下运行你自己的。 (报告的时间是每次插入的平均 ns)

                     with `memcpy`   stribor14   `operator[]`   with pointers 
                   |---------------|-----------|--------------|---------------|
               -O2 |         1.38  |     1.57  |        1.41  |         1.15  |
               -O3 |         1.37  |     1.63  |        1.36  |         1.09  |
 -O3 -march=native |         1.35  |     1.61  |        1.34  |         1.09  |

不用说:我对我认为应该表现最好的东西感到非常失望。但如前所述,此基准绝不代表任何实际性能。

【讨论】:

我使用-O3 -march=native 编译,因为这是其中一部分的其余代码使用矢量化。尽管如此,还是感谢您的回答。另外,我后来测量的平均时间大约是130ns 我的答案

以上是关于C++ 中的高效循环缓冲区,将传递给 C 风格的数组函数参数的主要内容,如果未能解决你的问题,请参考以下文章

将可写 StringBuilder 数组从 C# 传递给 C++

将c ++引用的地址传递给指针

将 c++ 双缓冲区传递给 Node Js(直接)[Node JS : V8 c++ : Nan]

为啥不允许将数组按值传递给 C 和 C++ 中的函数?

为 DSP 目的在 C++ 中处理音频数据

c ++分段错误将指针传递给函数