普罗米修斯irate/rate算法区别(原创)

Posted 工程师阿杜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了普罗米修斯irate/rate算法区别(原创)相关的知识,希望对你有一定的参考价值。

文章目录

irate算法

流程

  • 选取时间范围内最后两个点:end1,end2

  • 计算两个点的差值,这里分两种情况:

    ​ 正常情况下:end2 > end1 ,此时 value = end2 - end1

    ​ 异常情况下,end2 < end1 ,此时 value = end2

  • 计算两个时间点的时间差time = time2 - time1

  • 计算最终结果:value / time 并换算毫秒单位

流程图

end2 > end1 end2 < end1 选取时间范围内的点集合 选取集合内最后两个点: end1 end2 计算end1和end2之间差值 差值 value = end2 - end1 差值 value = end2 值差 value 计算 end1 和 end2 之间时间差 time time = time2 - time1 value/time

特点

  • 相当于区间内的最后时刻瞬时值
  • 只选取了区间最后两个点进行计算
  • 如果出现了counter异常类型的值,就会计算出巨大的结果

代码

package promql

var functions = map[string]*Function
    "irate": 
        Name:       "irate",
        ArgTypes:   []ValueTypeValueTypeMatrix,
        ReturnType: ValueTypeVector,
        // 处理函数 funcIrate
        Call:       funcIrate,
    ,


// irate 方法
func funcIrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector 
    // 注意,最后一个参数 isRate = true
    return instantValue(vals, enh.out, true)


// irate 算法内容
// isRate = true
func instantValue(vals []Value, out Vector, isRate bool) Vector 
    for _, samples := range vals[0].(Matrix) 
        if len(samples.Points) < 2 
            // 少于两个点,无效范围
            continue
        

        // 取区间内最后两个数值
        // endindex 的点位
        lastSample := samples.Points[len(samples.Points)-1]
        // endindex - 1 的点位
        previousSample := samples.Points[len(samples.Points)-2]

        var resultValue float64
        // irate计算时 isRate = true
        if isRate && lastSample.V < previousSample.V 
            // counter 中出现了明显的异常点,最后两个值是 大--小 的结构,因此 counter 重置
            resultValue = lastSample.V
         else 
            // 最后两个值是 小--大 的结构,因此直接相减
            resultValue = lastSample.V - previousSample.V
        
        /* 此时 resultValue有两种情况:
           1、正常情况时,resultValue=最后两个点的差值,
           2、counter 类型异常时,resultValue=最后选取的点的值,
        */
        // 两个值的时间间隔
        sampledInterval := lastSample.T - previousSample.T
        if sampledInterval == 0 
            // 选取的两个点时间差为0.要舍弃,否则除以0最终会是无限大
            continue
        

        // irate 时 isRate = true
        if isRate 
            // 变化值除以时间间隔,这里的1000看来默认时间单位是秒,这里除以1000,换算成毫秒?
            // 此时如果 counter 遇到了 重置点, 那么这里计算出来的值会【异常的大】
            resultValue /= float64(sampledInterval) / 1000
        

        out = append(out, Sample
            Point: PointV: resultValue,
        )
    
    return out

rate函数

流程

  1. 查找点集合的值差

    如果始终 next > current,那么 value = end - begin

    如果中间存在 A = next < current,那么 value = end - begin + 每个异常的落差A

  2. 计算点集合的时间跨度比例P

    计算开头、结尾空隙

    计算时间跨度的有效值与时间范围的比例P

  3. 按照P等比例扩大value的值

  4. 计算最终结果: value/区间时间跨度

流程图

end2 > end1 end2 < end1 选取时间范围内的点集合 计算首尾两个点的差值 差值 value = end - begin 差值 value = end - begin + 所有异常落差值 值差 value 计算点集合的起始位置 对比区间长度, 计算点集合的范围与区间跨度的比例P 按照比例P扩大value的值=value2 按照调整后的value2 计算最终结果: value2/区间长度

计算点集合落差的异常值

目的&特点:无论中间是否出现倒序情况,都能算出近似于总落差的值

等比例扩大区间的理解

特点

  • 相当于区间内的平均值

  • 读取了区间内最后一个点第一个点的差值

  • 又等比例扩大了点集合的时间范围到区间跨度

  • 因此计算比irate较为严谨

代码

var functions = map[string]*Function
    // rate 方法入口
    "rate": 
        Name:       "rate",
        ArgTypes:   []ValueTypeValueTypeMatrix,
        ReturnType: ValueTypeVector,
        Call:       funcRate,
    ,


// rate 入口
func funcRate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector 
    // isRate = true
    return extrapolatedRate(vals, args, enh, true, true)


// 算法实现
func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector 
    ms := args[0].(*MatrixSelector)

    var (
        // 默认是毫秒单位
        matrix     = vals[0].(Matrix)
        rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset)
        rangeEnd   = enh.ts - durationMilliseconds(ms.Offset)
    )

    for _, samples := range matrix 
        // No sense in trying to compute a rate without at least two points. Drop
        // this Vector element.
        if len(samples.Points) < 2 
            continue
        
        var (
            counterCorrection float64
            lastValue         float64
        )
        for _, sample := range samples.Points 
            // Counter类型数据,但是数值又小于上一次的 value,说明重置了 counter
            // 否则的话,这个值一直为空
            if isCounter && sample.V < lastValue 
                // 所有异常节点的差值累计
                counterCorrection += lastValue
            
            // 由于是 for 循环,因此这里的 lastValue 最终就是当前点集合最后一个点的值
            lastValue = sample.V
        
        // 集合首位两个点的差值。 集合最后一个点的值 - 第一个点的值,最后加上了定时器重置的 counter(一般为0)
        // 这个 counterCorrection 的理解可以参考下图,是计算的关键点
        resultValue := lastValue - samples.Points[0].V + counterCorrection

        // 区间开始时间与 第一个点的时间间隔
        durationToStart := float64(samples.Points[0].T-rangeStart) / 1000

        // 最后一个点与区间结束时间之间的时间空间,具体算法: 区间结束的时间-区间最后一个点的时间,就是 最后一个点与区间结尾之间的时间间隔
        durationToEnd := float64(rangeEnd-samples.Points[len(samples.Points)-1].T) / 1000

        // 区间内所有点的时间差,也就是区间最后一个点时间 - 区间第一个点时间
        sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000

        // 区间内采集点占用的平均时间,也就是  总集合点时间/总点数
        averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1)

        if isCounter && resultValue > 0 && samples.Points[0].V >= 0 
            // Counters cannot be negative. If we have any slope at
            // all (i.e. resultValue went up), we can extrapolate
            // the zero point of the counter. If the duration to the
            // zero point is shorter than the durationToStart, we
            // take the zero point as the start of the series,
            // thereby avoiding extrapolation to negative counter
            // values.
            // 这里较为难理解,目的: 区间内是一个递增的序列,假设这个序列是线性的,那么要寻找,这个线性序列的值为0的那个点所在的时间点
            // 如果这个0所在的时间点在第一个点左边,那么就要砍掉0值左侧的区域
            // 比如 1 3 5 7这个序列,那么推测的0值位置就在 6*(1/6) = 1,也就是说这个线性序列的0值就是开始的位置
            durationToZero := sampledInterval * (samples.Points[0].V / resultValue)
            if durationToZero < durationToStart 
                // 如果中间counter重置了,这里好像是在找重置时那个0点的位置,我们将会把区间起点调整到0点附近
                // 如果选择的序列所推测的的0值位置在时间开始与第一个节点之间的位置,那么就要调整整个有效区间的开始位置
                durationToStart = durationToZero
            
        

        // If the first/last samples are close to the boundaries of the range,
        // extrapolate the result. This is as we expect that another sample
        // will exist given the spacing between samples we've seen thus far,
        // with an allowance for noise.
        // 噪音处理,因为点集合的区间实际上小于时间区间,两边有一段空白,因此这里 *1.1 来扩大了一下点集合的区间范围
        extrapolationThreshold := averageDurationBetweenSamples * 1.1
        // 默认是所有集合点的时间差
        extrapolateToInterval := sampledInterval
        // 将点集合与区间两端的时间空隙添加到 extrapolateToInterval 中
        if durationToStart < extrapolationThreshold 
            // 决定最终区间的起点
            extrapolateToInterval += durationToStart
         else 
            extrapolateToInterval += averageDurationBetweenSamples / 2
        

        if durationToEnd < extrapolationThreshold 
            // 决定区间的终点
            extrapolateToInterval += durationToEnd
         else 
            extrapolateToInterval += averageDurationBetweenSamples / 2
        

        // extrapolateToInterval 是经过调整的区间长度,添加了 集合点与时间区间两端的时间空隙,因此 extrapolateToInterval 会比 sampledInterval 大一点
        // 这个操作等于按照  实际时间区间/集合点时间区间  这个比例,把 resultValue 这个集合点两端的差值等比例扩大
        resultValue = resultValue * (extrapolateToInterval / sampledInterval)
        if isRate 
            // 利用等比例扩大后的插值,除以时间间隔。计算最终的每次 for 循环的变化率
            resultValue = resultValue / ms.Range.Seconds()
        

        enh.out = append(enh.out, Sample
            Point: PointV: resultValue,
        )
    
    return enh.out

以上是关于普罗米修斯irate/rate算法区别(原创)的主要内容,如果未能解决你的问题,请参考以下文章

LeetCode算法题-Minimum Distance Between BST Nodes(Java实现-四种解法)

算法--相邻两数最大差值

线性差值算法

算法—— 相邻两数的最大差值

[算法]数组排序之后相邻数的最大差值

一维差值维护心得