在 C++ 中计算滚动/移动平均线

Posted

技术标签:

【中文标题】在 C++ 中计算滚动/移动平均线【英文标题】:Calculate rolling / moving average in C++ 【发布时间】:2012-06-14 23:29:20 【问题描述】:

我知道这是可以通过 boost 实现的:

Using boost::accumulators, how can I reset a rolling window size, does it keep extra history?

但我真的很想避免使用 boost。我用谷歌搜索,没有找到任何合适或可读的例子。

基本上,我想使用最近的 1000 个数字作为数据样本来跟踪正在进行的浮点数流的移动平均值。

实现这一目标的最简单方法是什么?


我尝试使用圆形数组、指数移动平均线和更简单的移动平均线,发现圆形数组的结果最适合我的需要。

【问题讨论】:

为什么要避免使用 Boost?它是一套完善、广泛使用且得到良好支持的 C++ 库。没有理由重新发明***。 您还停留在哪一部分?从数学角度你知道你想要哪种移动平均算法吗? 滚动平均值适用于整数,但对于浮点,由于舍入和大小差异,您可能会遇到奇怪的行为...... 诀窍是防止 Buffer-to-AveragingBuffer 复制。这里的一些人希望您为以前的样本创建一个单独的缓冲区。这可能不是必需的,因为样本可能来自缓冲区。 @templatetypedef,由于上面链接问题的 cmets 中的问题,goji 正试图避免 Boost。那里唯一的解决方案(截至目前)需要重新积累数据。 “逆”建议使用循环缓冲区或双端队列进行滚动平均。 【参考方案1】:

如果您的需求很简单,您可以尝试使用指数移动平均线。

http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average

简单地说,您创建了一个累加器变量,当您的代码查看每个样本时,代码会使用新值更新累加器。您选择一个介于 0 和 1 之间的常量“alpha”,然后计算:

accumulator = (alpha * new_value) + (1.0 - alpha) * accumulator

您只需要找到一个“alpha”值,其中给定样本的效果仅持续大约 1000 个样本。

嗯,我不确定这是否适合你,现在我已经把它放在这里了。问题是 1000 是指数移动平均线的一个相当长的窗口。我不确定是否有一个 alpha 可以将平均值分布在最后 1000 个数字上,而不会在浮点计算中出现下溢。但是,如果您想要一个较小的平均值,例如 30 个左右的数字,这是一种非常简单快捷的方法。

【讨论】:

这可能是矫枉过正。每次添加一个新数字时不需要重新计算整个系列吗? 不,它只需要两个乘法和每个新数字的加法。如果您没有预先计算(1.0 - alpha),则加一减法。 (1.0 - alpha) 越接近 1.0,先前数字的影响持续的时间越长,每个新数字的影响就越小。 alpha 越接近 1.0,移动平均线更新速度越快以响应新值。 您的帖子+1。指数移动平均线可以让alpha 可变。因此,这允许它用于计算时基平均值(例如,每秒字节数)。如果自上次累加器更新以来的时间超过 1 秒,则让 alpha1.0。否则,您可以让alpha 成为(自上次更新以来的使用秒数/1000000)。 我发现指数移动平均线有时非常有用。曾经我使用 EMA 计算 Internet 连接的可靠性指标;对于每个成功的连接,我平均为 1.0 值,对于每个失败,我平均为 0.0 值。它工作得很好。如果连接可靠,我希望它达到 100.0%,所以如果连接连续 10 次良好,我添加“奖励”分数,如果连接连续失败 10 次,则减去惩罚。 @user315052 表示,如果将 alpha 设置为 1.0/1000,它将接近 1000 个样本的平均值。它不能与 1000 个样本的实际平均值相同,但我确实认为它的效果对于许多目的来说足够相似。我建议您尝试一下:使用 alpha 设置为 1.0/1000 的指数移动平均线,看看您是否喜欢这样得到的平均值。【参考方案2】:

您只需要一个包含 1000 个元素的循环数组(循环缓冲区),在其中将元素添加到前一个元素并存储它。

它变成了一个递增的总和,你总是可以得到任意两对元素之间的总和,然后除以它们之间的元素数量,得到平均值。

【讨论】:

这比我的回答要好。没有技巧,只需存储 1000 个数字并求平均值。 我希望避免将所有数字存储在一个数组中并保持它们“长期”。看来这可能是唯一合适的方式。 请注意,对于“循环数组”,boost::circular_buffer 是一个(非常好的)候选实现。 警告:注意浮点累积问题!参见***上的 Kahan Summation 或其他。你不希望在没有减少错误的情况下将非常小的浮点数添加到大的浮点数中,认真的!【参考方案3】:

基本上,我想使用最近的 1000 个数字作为数据样本来跟踪正在进行的浮点数流的移动平均值。

请注意,下面将 total_ 更新为添加/替换的元素,避免了昂贵的 O(N) 遍历来按需计算求和所需的平均值。

template <typename T, typename Total, size_t N>
class Moving_Average

  public:
    Moving_Average& operator()(T sample)
    
        total_ += sample;
        if (num_samples_ < N)
            samples_[num_samples_++] = sample;
        else
        
            T& oldest = samples_[num_samples_++ % N];
            total_ -= oldest;
            oldest = sample;
        
        return *this;
    

    operator double() const  return total_ / std::min(num_samples_, N); 

  private:
    T samples_[N];
    size_t num_samples_0;
    Total total_0;
;

例子:

// average of last 3 (from 4) samples...
std::cout << Moving_Average<double, double, 3>(4)(7)(2)(6) << '\n';
    // "5\n"

// average of last 3 squares...
Moving_Average<double, double, 3> ma;
for (int i = 0; i < 10; ++i)
    std::cout << (i * i) << ':' << ma(i * i) << ' ';
std::cout << '\n';
    // 0:0 1:0.5 4:1.66667 9:4.66667 16:9.66667 25:16.6667 36:25.6667 49:36.6667 64:49.6667 81:64.6667

TotalT 不同,以支持例如在总计 1000 个 longs 时使用 long long,在 chars 时使用 int,或者在总计 floats 时使用 double

问题

这有点缺陷,因为num_samples_ 在概念上可以回绕到 0,但很难想象任何人有 2^64 个样本:如果担心,请使用额外的 bool 数据成员来记录容器第一次出现的时间在阵列周围循环 num_samples_ 时填充(最好然后重命名一些无害的东西,如“pos”)。

另一个问题是浮点精度固有的,可以用T=doubleN=2 的简单场景来说明:我们从total_ = 0 开始,然后注入样本1E17, 1, 2...

1E17,我们执行total_ += 1E17,所以total_ == 1E17,然后注入

1,我们执行total += 1,但total_ == 1E17仍然是,因为“1”太微不足道而无法更改像1E17这样大的数字的64位double表示,然后我们注入

2,我们执行total += 2 - 1E17,其中2 - 1E17首先被评估并产生-1E17,因为2失去了不精确/无意义,所以我们在1E17的总数中添加-1E17,total_变成0 ,尽管我们希望 total_ 的当前样本为 1 和 2 为 3。我们的移动平均值将计算 0 而不是 1.5。当我们添加另一个样本时,我们将从total_ 中减去“最旧的”1,尽管它从未被正确地合并到其中;我们的total_ 和移动平均线可能仍然是错误的。

您可以添加存储最近的total_ 的代码,如果当前total_ 太小(模板参数可以提供乘法阈值),您可以从所有样本中重新计算total_samples_ 数组中(并将highest_recent_total_ 设置为新的total_),但我会把它留给足够关心的读者。

【讨论】:

一个假设“void operator(T sample)”实际上是“void operator @oPless 啊...很好发现...实际上我的意思是它是void operator()(T sample) 但当然你可以使用任何你喜欢的符号。会解决的,谢谢。 是的!我今天早些时候发现有人可以使用“void operator()(T sample)”,并正在考虑尝试修改我的评论以反映这一点:-) 你可以用这样的东西(在 else 部分)避免翻转,这将同样有效:num_samples_ = N + (++num_samples_ % N); T&amp; oldest = samples_[num_samples_]; @DEKKER:一个例子:Moving_Average&lt;double, double, 20&gt; ma; ma(10); ma(15.2); ma(19); std::cout &lt;&lt; ma &lt;&lt; '\n';【参考方案4】:

您可以通过对输入流应用加权平均值来近似滚动平均值。

template <unsigned N>
double approxRollingAverage (double avg, double input) 
    avg -= avg/N;
    avg += input/N;
    return avg;

这样,您无需维护 1000 个存储桶。但是,它是一个近似值,因此它的值不会与真正的滚动平均值完全匹配。

编辑:刚刚注意到@steveha 的帖子。这相当于指数移动平均线,alpha 为 1/N(在这种情况下,我将 N 取为 1000 以模拟 1000 个桶)。

【讨论】:

这似乎与实际的移动平均线不太吻合(至少对于随机流而言),尽管我确信这也不是一个糟糕的衡量标准(一些代码:gist.github.com/Aktau/6102979) 错误可以通过这种方法快速累积,特别是对于具有高变异性的数据集。考虑一个相对不常见的高振幅尖峰信号。当他们进入窗口时,他们会提高平均值,但当他们离开后门时,平均值只会降低 avg/N,而不是 peakAmp/N。 @JSalazar:我使用了一个固定的 alpha,假设将定期进行测量。但是,如果测量之间的间隔是可变的,您应该使用时间加权平均值,而不是使用可变加权 alpha 而不是我的答案中的固定 1/N。 @bunkerdive 错误不会累积和发散。这就是我所说的收敛。 51.76 反对 35.8。 @bunkerdive:解决您的反对意见的一种方法是,如果输入相对接近先前的输入,则给予更高的权重。权重越接近 1.0,公差就必须越紧以满足相对接近的要求。任何时候未达到容差,权重都会再次下降到1/N。我实现了一个简单的概念证明,请参阅下一条评论。【参考方案5】:

计算滚动平均值和滚动标准偏差的简单类:

#define _stdev(cnt, sum, ssq) sqrt((((double)(cnt))*ssq-pow((double)(sum),2)) / ((double)(cnt)*((double)(cnt)-1)))

class moving_average 
private:
    boost::circular_buffer<int> *q;
    double sum;
    double ssq;
public:
    moving_average(int n)  
        sum=0;
        ssq=0;
        q = new boost::circular_buffer<int>(n);
    
    ~moving_average() 
        delete q;
    
    void push(double v) 
        if (q->size() == q->capacity()) 
            double t=q->front();
            sum-=t;
            ssq-=t*t;
            q->pop_front();
        
        q->push_back(v);
        sum+=v;
        ssq+=v*v;
    
    double size() 
        return q->size();
    
    double mean() 
        return sum/size();
    
    double stdev() 
        return _stdev(size(), sum, ssq);
    

;

【讨论】:

大概,如果n足够大,你会开始遇到精度问题? 还有,为什么要动态分配?它不仅看起来没有必要,而且在复制或移动时会使您的类不安全(由于缺少用户定义的构造函数和赋值运算符) 然后是宏的问题。更喜欢一个不错的内联函数。您只能使用一次!【参考方案6】:

一种方法是循环存储缓冲区数组中的值。并以这种方式计算平均值。

int j = (int) (counter % size);
buffer[j] = mostrecentvalue;
avg = (avg * size - buffer[j - 1 == -1 ? size - 1 : j - 1] + buffer[j]) / size;

counter++;

// buffer[j - 1 == -1 ? size - 1 : j - 1] is the oldest value stored

整个过程在一个循环中运行,其中最近的值是动态的。

【讨论】:

【参考方案7】:

我经常在具有相当疯狂的更新率(50kilosamples/sec)的硬实时系统中使用它,因此我通常预先计算标量。

计算 N 个样本的移动平均值: 标量1 = 1/N; 标量2 = 1 - 标量1; // 或 (1 - 1/N) 那么:

平均 = currentSample*scalar1 + Average*scalar2;

示例:10 个元素的滑动平均值

double scalar1 = 1.0/10.0;  // 0.1
double scalar2 = 1.0 - scalar1; // 0.9
bool first_sample = true;
double average=0.0;
while(someCondition)

   double newSample = getSample();
   if(first_sample)
   
    // everybody forgets the initial condition *sigh*
      average = newSample;
      first_sample = false;
   
   else
   
      average = (sample*scalar1) + (average*scalar2);
   
 

注意:这只是上述 steveha 给出的答案的实际实现。 有时更容易理解一个具体的例子。

【讨论】:

【参考方案8】:

您可以实现ring buffer。制作一个包含 1000 个元素的数组,以及一些用于存储开始和结束索引以及总大小的字段。然后只需将最后 1000 个元素存储在环形缓冲区中,并根据需要重新计算平均值。

【讨论】:

Karthik 的算法和你的有很大的不同。 最好发布一个实现,比如 Tony D。【参考方案9】:

增加@Nilesh 的回答(归功于他),您可以:

跟踪求和,不用每次除法再乘,产生误差 避免使用 % 运算符的 if 条件

这是UNTESTED示例代码来展示这个想法,它也可以被包装到一个类中:

const unsigned int size=10; // ten elements buffer

unsigned int counterPosition=0;
unsigned int counterNum=0;

int buffer[size];
long sum=0;

void reset() 
    for(int i=0;i<size;i++) 
        buffer[i]=0;
    


float addValue(int value) 
    unsigned  int oldPos = ((counterPosition + 1) % size);

    buffer[counterPosition] = value;
    sum = (sum - buffer[oldPos] + value); 

    counterPosition=(counterPosition+1) % size;
    if(counterNum<size) counterNum++;

    return ((float)sum)/(float)counterNum;


float removeValue() 
    unsigned  int oldPos =((counterPosition + 1) % size);

    buffer[counterPosition] = 0;
    sum = (sum - buffer[oldPos]); 

    if(counterNum>1)  // leave one last item at the end, forever
        counterPosition=(counterPosition+1) % size;
        counterNum--; // here the two counters are different
    
    return ((float)sum)/(float)counterNum;

应该注意,如果缓冲区被重置为全零,此方法在接收第一个值时可以正常工作,因为 - buffer[oldPos] 为零并且计数器增长。第一个输出是收到的第一个数字。第二个输出是仅前两个的平均值,依此类推,值在到达时逐渐消失,直到达到size 个项目。

同样值得考虑的是,如果您在输入数组的末尾停止,此方法与任何其他滚动平均方法一样,是不对称的,因为在末尾不会发生相同的衰落(它可能发生在数据结束后,通过正确的计算)。

没错。 100 个元素的滚动平均值和 10 个缓冲区的结果不同:10 个淡入,90 个完美滚动 10 个元素,最后 10 个淡出,100 个数字总共有 110 个结果输入!您可以选择显示哪些内容(以及直接从旧到新,还是向后,从新到旧)。

要在结束后正确淡出,您可以继续逐个添加零并每次将项目数减一,直到达到size 元素(仍然跟踪旧值的正确位置)。

用法是这样的:

int avg=0;
reset();

avg=addValue(2); // Rpeat for 100 times
avg=addValue(3); // Use avg value

...

avg=addValue(-4);
avg=addValue(12); // last numer, 100th input 

// If you want to fade out repeat 10 times after the end of data:

avg=removeValue(); // Rpeat for last 10 times after data has finished
avg=removeValue(); // Use avg value
...
avg=removeValue();
avg=removeValue();

【讨论】:

【参考方案10】:

我使用了双端队列...似乎对我有用。此示例有一个向量,但您可以跳过该方面,只需将它们添加到双端队列。

#include <deque>

template <typename T>
double mov_avg(vector<T> vec, int len)
  deque<T> dq = ;
  for(auto i = 0;i < vec.size();i++)
    if(i < len)
      dq.push_back(vec[i]);
    
    else 
      dq.pop_front();
      dq.push_back(vec[i]);
    
  
  double cs = 0;
  for(auto i : dq)
    cs += i;
  
  return cs / len;




//Skip the vector portion, track the input number (or size of deque), and the value.


  double len = 10;
  double val; //Accept as input
  double instance; //Increment each time input accepted.

  deque<double> dq;
  if(instance < len)
      dq.push_back(val);
  
  else 
      dq.pop_front();
      dq.push_back(val);
    
  
  double cs = 0;
  for(auto i : dq)
    cs += i;
  
  double rolling_avg = cs / len;

//为了进一步简化——向它添加值,然后简单地平均双端队列。

 int MAX_DQ = 3;
 void add_to_dq(deque<double> &dq, double value)
    if(dq.size() < MAX_DQ)
      dq.push_back(value);
    else 
      dq.pop_front();
      dq.push_back(value);
    
  
 

我偶尔使用的另一种技巧是使用 mod 覆盖向量中的值。

  vector<int> test_mod = 0,0,0,0,0;
  int write = 0;
  int LEN = 5;
  
  int instance = 0; //Filler for N -- of Nth Number added.
  int value = 0; //Filler for new number.

  write = instance % LEN;
  test_mod[write] = value;
  //Will write to 0, 1, 2, 3, 4, 0, 1, 2, 3, ...
  //Then average it for MA.

  //To test it...
  int write_idx = 0;
  int len = 5;
  int new_value;
  for(auto i=0;i<100;i++)
      cin >> new_value;
      write_idx = i % len;
      test_mod[write_idx] = new_value;

最后一个(hack)没有桶、缓冲区、循环,什么都没有。只是一个被覆盖的向量。它是 100% 准确的(对于向量中的 avg / 值)。很少维护正确的顺序,因为它开始向后重写(在 0 处),因此在示例 5,1,2,3,4 等中,第 5 个索引将在 0 处。

【讨论】:

【参考方案11】:

10 个项目的简单移动平均值,使用列表:

#include <list>

std::list<float> listDeltaMA;

float getDeltaMovingAverage(float delta)

    listDeltaMA.push_back(delta);
    if (listDeltaMA.size() > 10) listDeltaMA.pop_front();
    float sum = 0;
    for (std::list<float>::iterator p = listDeltaMA.begin(); p != listDeltaMA.end(); ++p)
        sum += (float)*p;
    return sum / listDeltaMA.size();

【讨论】:

以上是关于在 C++ 中计算滚动/移动平均线的主要内容,如果未能解决你的问题,请参考以下文章

熊猫移动平均线[重复]

如何使用 python + NumPy / SciPy 计算滚动/移动平均值?

在python中计算指数移动平均线

在 SQL Server 2012 中计算移动平均线

移动平均线计算不正确

计算时间加权移动平均线