聊聊rsocket load balancer的Ewma

Posted 码匠的流水账

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊rsocket load balancer的Ewma相关的知识,希望对你有一定的参考价值。

本文主要研究一下rsocket load balancer的Ewma

Moving Average

SMA

SMA(Simple Moving Average),即简单移动平均,其公式如下:


SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + ... + Pt-n+1) / n

这里的Pt到为Pt-n+1为最近的n个数据

WMA

WMA(Weighted Moving Average),即加权移动平均,其公式如下:


WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + ... + (Pt-n+1 * Wt-n+1)

WMA会给最近的n个数据加上权重,其中这些权重加起来和为1,一般是较近的数据权重比较大

EMA或EWMA

EMA(Exponentially Moving Average)指数移动平均或EWMA(Exponentially Weighted Moving Average)指数加权移动平均,其公式如下:


EMAt = (Pt * S) + (1- S) * EMAt-1

它有一个S参数为平滑指数,一般是取2/(N+1)

Ewma

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.java


public class Ewma {
private final long tau;
private volatile long stamp;
private volatile double ewma;

public Ewma(long halfLife, TimeUnit unit, double initialValue) {
  this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
  stamp = 0L;
  ewma = initialValue;
}

public synchronized void insert(double x) {
  long now = Clock.now();
  double elapsed = Math.max(0, now - stamp);
  stamp = now;

  double w = Math.exp(-elapsed / tau);
  ewma = w * ewma + (1.0 - w) * x;
}

public synchronized void reset(double value) {
  stamp = 0L;
  ewma = value;
}

public double value() {
  return ewma;
}

@Override
public String toString() {
  return "Ewma(value=" + ewma + ", age=" + (Clock.now() - stamp) + ")";
}
}
  • Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重

  • 权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量

  • 这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度

RSocketSupplier

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.java


public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable {

private static final double EPSILON = 1e-4;

private Supplier<Mono<RSocket>> rSocketSupplier;

private final MonoProcessor<Void> onClose;

private final long tau;
private long stamp;
private final Ewma errorPercentage;

public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
  this.rSocketSupplier = rSocketSupplier;
  this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
  this.stamp = Clock.now();
  this.errorPercentage = new Ewma(halfLife, unit, 1.0);
  this.onClose = MonoProcessor.create();
}

public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
  this(rSocketSupplier, 5, TimeUnit.SECONDS);
}

@Override
public double availability() {
  double e = errorPercentage.value();
  if (Clock.now() - stamp > tau) {
    // If the window is expired artificially increase the availability
    double a = Math.min(1.0, e + 0.5);
    errorPercentage.reset(a);
  }
  if (e < EPSILON) {
    e = 0.0;
  } else if (1.0 - EPSILON < e) {
    e = 1.0;
  }

  return e;
}

private synchronized void updateErrorPercentage(double value) {
  errorPercentage.insert(value);
  stamp = Clock.now();
}

@Override
public Mono<RSocket> get() {
  return rSocketSupplier
      .get()
      .doOnNext(o -> updateErrorPercentage(1.0))
      .doOnError(t -> updateErrorPercentage(0.0))
        .map(AvailabilityAwareRSocketProxy::new);
}

@Override
public void dispose() {
  onClose.onComplete();
}

@Override
public boolean isDisposed() {
  return onClose.isDisposed();
}

@Override
public Mono<Void> onClose() {
  return onClose;
}

private class AvailabilityAwareRSocketProxy extends RSocketProxy {
  public AvailabilityAwareRSocketProxy(RSocket source) {
    super(source);

    onClose.doFinally(signalType -> source.dispose()).subscribe();
  }

  @Override
  public Mono<Void> fireAndForget(Payload payload) {
    return source
        .fireAndForget(payload)
        .doOnError(t -> errorPercentage.insert(0.0))
        .doOnSuccess(v -> updateErrorPercentage(1.0));
  }

  @Override
  public Mono<Payload> requestResponse(Payload payload) {
    return source
        .requestResponse(payload)
        .doOnError(t -> errorPercentage.insert(0.0))
        .doOnSuccess(p -> updateErrorPercentage(1.0));
  }

  @Override
  public Flux<Payload> requestStream(Payload payload) {
    return source
        .requestStream(payload)
        .doOnError(th -> errorPercentage.insert(0.0))
        .doOnComplete(() -> updateErrorPercentage(1.0));
  }

  @Override
  public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    return source
        .requestChannel(payloads)
        .doOnError(th -> errorPercentage.insert(0.0))
        .doOnComplete(() -> updateErrorPercentage(1.0));
  }

  @Override
  public Mono<Void> metadataPush(Payload payload) {
    return source
        .metadataPush(payload)
        .doOnError(t -> errorPercentage.insert(0.0))
        .doOnSuccess(v -> updateErrorPercentage(1.0));
  }

  @Override
  public double availability() {
    // If the window is expired set success and failure to zero and return
    // the child availability
    if (Clock.now() - stamp > tau) {
      updateErrorPercentage(1.0);
    }
    return source.availability() * errorPercentage.value();
  }
}
}
  • RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0

  • RSocketSupplier定义了一个常量EPSILON = 1e-4,其availability方法会先计算availability,然后在距离上次计算时间stamp超过tau值时会重置errorPercentage;之后当availability小于EPSILON时返回0,当availability + EPSILON大于1时返回1.0

  • updateErrorPercentage方法用于往ewma插入新值,同时更新stamp;get方法的doOnNext方法updateErrorPercentage值为1.0,doOnError方法updateErrorPercentage值为0.0;map会将RSocket转换为AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计

小结

  • Moving Average有好几种算法,包括SMA(Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average)或EWMA(Exponentially Weighted Moving Average)

  • Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重;权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量;这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度

  • rsocket load balancer使用了Ewma了统计服务的availability;其中RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0;RSocketSupplier的get方法会将RSocket转换为AvailabilityAwareRSocketProxy,而AvailabilityAwareRSocketProxy则会对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计

doc

  • Simple Moving Average - SMA Definition

  • Weighted Moving Averages: The Basics

  • Exponential Moving Average - EMA Definition

  • How Is the Exponential Moving Average (EMA) Formula Calculated?

  • Moving Average, Weighted Moving Average, and Exponential Moving Average

  • Exploring the Exponentially Weighted Moving Average

  • EWMA 移动平均模型

  • rsocket EWMA

以上是关于聊聊rsocket load balancer的Ewma的主要内容,如果未能解决你的问题,请参考以下文章

RSocket云原生架构下的另一种通信协议选择

Load Balancer

AWS - Elastic Load Balancing 是不是真的阻止了 LOAD BALANCER 故障转移?

厨师:aws load_balancer_options 粘性

gRPC Load Balancing

How Load Balancing Policies Work