聊聊rsocket load balancer的Ewma
Posted 码匠的流水账
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊rsocket load balancer的Ewma相关的知识,希望对你有一定的参考价值。
序
本文主要研究一下rsocket load balancer的Ewma
Moving Average
SMA
SMA(Simple Moving Average
),即简单移动平均,其公式如下:
SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + ... + Pt-n+1) / n
这里的Pt到为Pt-n+1为最近的n个数据
WMA
WMA(Weighted Moving Average
),即加权移动平均,其公式如下:
WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + ... + (Pt-n+1 * Wt-n+1)
WMA会给最近的n个数据加上权重,其中这些权重加起来和为1,一般是较近的数据权重比较大
EMA或EWMA
EMA(Exponentially Moving Average
)指数移动平均或EWMA(Exponentially Weighted Moving Average
)指数加权移动平均,其公式如下:
EMAt = (Pt * S) + (1- S) * EMAt-1
它有一个S参数为平滑指数,一般是取2/(N+1)
Ewma
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.java
public class Ewma {
private final long tau;
private volatile long stamp;
private volatile double ewma;
public Ewma(long halfLife, TimeUnit unit, double initialValue) {
this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
stamp = 0L;
ewma = initialValue;
}
public synchronized void insert(double x) {
long now = Clock.now();
double elapsed = Math.max(0, now - stamp);
stamp = now;
double w = Math.exp(-elapsed / tau);
ewma = w * ewma + (1.0 - w) * x;
}
public synchronized void reset(double value) {
stamp = 0L;
ewma = value;
}
public double value() {
return ewma;
}
@Override
public String toString() {
return "Ewma(value=" + ewma + ", age=" + (Clock.now() - stamp) + ")";
}
}
Ewma的构造器需要指定halfLife、timeunit、initialValue(
ewma初始值
)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(
希腊字母
)为EWMA的时间常量这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度
RSocketSupplier
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.java
public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable {
private static final double EPSILON = 1e-4;
private Supplier<Mono<RSocket>> rSocketSupplier;
private final MonoProcessor<Void> onClose;
private final long tau;
private long stamp;
private final Ewma errorPercentage;
public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
this.rSocketSupplier = rSocketSupplier;
this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
this.stamp = Clock.now();
this.errorPercentage = new Ewma(halfLife, unit, 1.0);
this.onClose = MonoProcessor.create();
}
public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
this(rSocketSupplier, 5, TimeUnit.SECONDS);
}
@Override
public double availability() {
double e = errorPercentage.value();
if (Clock.now() - stamp > tau) {
// If the window is expired artificially increase the availability
double a = Math.min(1.0, e + 0.5);
errorPercentage.reset(a);
}
if (e < EPSILON) {
e = 0.0;
} else if (1.0 - EPSILON < e) {
e = 1.0;
}
return e;
}
private synchronized void updateErrorPercentage(double value) {
errorPercentage.insert(value);
stamp = Clock.now();
}
@Override
public Mono<RSocket> get() {
return rSocketSupplier
.get()
.doOnNext(o -> updateErrorPercentage(1.0))
.doOnError(t -> updateErrorPercentage(0.0))
.map(AvailabilityAwareRSocketProxy::new);
}
@Override
public void dispose() {
onClose.onComplete();
}
@Override
public boolean isDisposed() {
return onClose.isDisposed();
}
@Override
public Mono<Void> onClose() {
return onClose;
}
private class AvailabilityAwareRSocketProxy extends RSocketProxy {
public AvailabilityAwareRSocketProxy(RSocket source) {
super(source);
onClose.doFinally(signalType -> source.dispose()).subscribe();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
return source
.fireAndForget(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(v -> updateErrorPercentage(1.0));
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return source
.requestResponse(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(p -> updateErrorPercentage(1.0));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return source
.requestStream(payload)
.doOnError(th -> errorPercentage.insert(0.0))
.doOnComplete(() -> updateErrorPercentage(1.0));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return source
.requestChannel(payloads)
.doOnError(th -> errorPercentage.insert(0.0))
.doOnComplete(() -> updateErrorPercentage(1.0));
}
@Override
public Mono<Void> metadataPush(Payload payload) {
return source
.metadataPush(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(v -> updateErrorPercentage(1.0));
}
@Override
public double availability() {
// If the window is expired set success and failure to zero and return
// the child availability
if (Clock.now() - stamp > tau) {
updateErrorPercentage(1.0);
}
return source.availability() * errorPercentage.value();
}
}
}
RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0
RSocketSupplier定义了一个常量EPSILON = 1e-4,其availability方法会先计算availability,然后在距离上次计算时间stamp超过tau值时会重置errorPercentage;之后当availability小于EPSILON时返回0,当availability + EPSILON大于1时返回1.0
updateErrorPercentage方法用于往ewma插入新值,同时更新stamp;get方法的doOnNext方法updateErrorPercentage值为1.0,doOnError方法updateErrorPercentage值为0.0;map会将RSocket转换为AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计
小结
Moving Average有好几种算法,包括SMA(
Simple Moving Average
)、WMA(Weighted Moving Average
)、EMA(Exponentially Moving Average
)或EWMA(Exponentially Weighted Moving Average
)Ewma的构造器需要指定halfLife、timeunit、initialValue(
ewma初始值
)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重;权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母
)为EWMA的时间常量;这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度rsocket load balancer使用了Ewma了统计服务的availability;其中RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0;RSocketSupplier的get方法会将RSocket转换为AvailabilityAwareRSocketProxy,而AvailabilityAwareRSocketProxy则会对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计
doc
Simple Moving Average - SMA Definition
Weighted Moving Averages: The Basics
Exponential Moving Average - EMA Definition
How Is the Exponential Moving Average (EMA) Formula Calculated?
Moving Average, Weighted Moving Average, and Exponential Moving Average
Exploring the Exponentially Weighted Moving Average
EWMA 移动平均模型
rsocket EWMA
以上是关于聊聊rsocket load balancer的Ewma的主要内容,如果未能解决你的问题,请参考以下文章
AWS - Elastic Load Balancing 是不是真的阻止了 LOAD BALANCER 故障转移?