Angular - http 拦截器 - http 速率限制器 - 滑动窗口

Posted

技术标签:

【中文标题】Angular - http 拦截器 - http 速率限制器 - 滑动窗口【英文标题】:Angular - http interceptors - http rate limiter - sliding window 【发布时间】:2021-04-23 05:30:00 【问题描述】:

我有一个用例,我需要限制传出 http 请求的数量。是的,我在服务器端确实有速率限制器,但前端也需要对活动 http 请求的数量进行限制。因此,我正在尝试实现一个滑动窗口协议,在任何时候我都会只有 n 个活动请求。

这种使用 Rxjs 的方法通常可以正常工作,请参见此处: https://jsbin.com/pacicubeci/1/edit?js,console,output

但我不清楚如何对 http 拦截器使用相同的逻辑。我在下面的尝试在编译时失败并出现以下错误:

类型 'Subscription' 缺少类型 'Observable' 中的以下属性:_isScalar、source、operator、lift 和 114 多个。(2740)

这样,我怎样才能返回一个 observable 并同时在 http 拦截器上维护一个队列?我的方法有缺陷吗?我可以使用http拦截器来限制http速率吗?

@Injectable()
export class I1 implements HttpInterceptor 
  intercept(
    req: HttpRequest<any>,
    next: HttpHandler
  ): Observable<HttpEvent<any>> 
    const modified = req.clone( setHeaders:  "Custom-Header-1": "1"  );

    return next
      .handle(req)
      .do((ev: HttpEvent<any>) => 
        if (ev instanceof HttpResponse) 
          console.log(ev);
        
      )
      .pipe(
        bufferTime(1000, null, 1),
        filter(buffer => buffer.length > 0),
        concatMap(buffer => of(buffer).pipe(delay(1000)))
      )
      .subscribe(console.log);
      
    

https://stackblitz.com/edit/angular-interceptors-npqkjp?file=app/interceptors.ts

【问题讨论】:

你说 "active requests" - 对于 HTTP 1.1,浏览器已经限制了每个主机,参见例如***.com/q/985431/3001761. 是的,我知道这一点。想想http2,我的connection2数量可能比http1.1的数量高很多 这能回答你的问题吗? How to limit API calls per second with angular2 medium.com/leantaas-engineering/… 这里有一些想法 【参考方案1】:

在您的拦截器上,您返回的是订阅,而不是 Observable。

如果您删除行 .subscribe(console.log) 它应该编译得很好。订阅由消费者完成。

如果您想对发出的所有内容进行控制台记录,请使用 tap(next =&gt; ...) 运算符

编辑 - 嗯,它解决了编译错误,但我不确定它是否会按你的意愿工作......我不完全理解拦截器的工作原理。

【讨论】:

感谢您的回答。您的更改消除了编译错误,但我仍然对是否可以在 http 拦截器上设置滑动窗口感到困惑。拦截器函数需要返回一个 Observable,但滑动窗口只有在返回数组或嵌套的 observable 时才有意义。或者至少这是我对 Rxjs 最好的看法。【参考方案2】:

如果您想了解更多关于拦截器和 HttpClientModule 如何在后台工作的信息,您可以查看这篇文章:Exploring the HttpClientModule in Angular。

我的方法有缺陷吗? 在这种情况下,问题在于 next.handle 应该返回一个 Observable,但通过订阅它,它返回一个 Subscription。

为了更好地理解原因,我将粘贴从上面链接的文章中复制的 sn-p:

const obsBE$ = new Observable(obs => 
  timer(1000)
    .subscribe(() => 
      // console.log('%c [OBSERVABLE]', 'color: red;');

      obs.next( response:  data: ['foo', 'bar']  );

      // Stop receiving values!
      obs.complete();
    )

    return () => 
      console.warn("I've had enough values!");
    
);

// Composing interceptors the chain
const obsI1$ = obsBE$
  .pipe(
    tap(() => console.log('%c [i1]', 'color: blue;')),
    map(r => ( ...r, i1: 'intercepted by i1!' ))
  );

let retryCnt = 0;
const obsI2$ = obsI1$
  .pipe(
    tap(() => console.log('%c [i2]', 'color: green;')),
    map(r =>  
      if (++retryCnt <=3) 
        throw new Error('err!') 
      

      return r;
    ),
    catchError((err, caught) => 
      return getRefreshToken()
        .pipe(
          switchMap(() => /* obsI2$ */caught),
        )
    )
  );

const obsI3$ = obsI2$
  .pipe(
    tap(() => console.log('%c [i3]', 'color: orange;')),
    map(r => ( ...r, i3: 'intercepted by i3!' ))
  );

function getRefreshToken () 
  return timer(1500)
    .pipe(q
      map(() => ( token: 'TOKEN HERE' )),
    );


function get () 
  return obsI3$


get()
  .subscribe(console.log)

/* 
-->
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
I've had enough values!
[i1]
[i2]
[i3]

  "response": 
    "data": [
      "foo",
      "bar"
    ]
  ,
  "i1": "intercepted by i1!",
  "i3": "intercepted by i3!"

I've had enough values!
*/

StackBlitz demo.

要点是拦截器创建某种,其以负责发出实际请求的可观察对象结束。 This 是链中的最后一个节点:

return new Observable((observer: Observer<HttpEvent<any>>) => 
  // Start by setting up the XHR object with request method, URL, and withCredentials flag.
  const xhr = this.xhrFactory.build();
  xhr.open(req.method, req.urlWithParams);
  if (!!req.withCredentials) 
    xhr.withCredentials = true;
  
  /* ... */
)

如何在 http 拦截器上同时返回一个 observable 并维护一个队列

我认为解决这个问题的一种方法是创建一个包含队列逻辑的拦截器,并使其intercept 方法返回一个Observable,以便它可以被订阅:

const queueSubject = new Subject<Observable>();

const pendingQueue$ = queueSubject.pipe(
  // using `mergeAll` because the Subject's `values` are Observables
  mergeAll(limit),
  share(),
);

intercept (req, next) 
  // `next.handle(req)` - it's fine to do this, no request will fire until the observable is subscribed
  queueSubject.next(
    next.handle(req)
      .pipe(
        // not interested in `Sent` events
        filter(ev => ev instanceof HttpResponse),

        filter(resp => resp.url === req.url),
      )
  );

  return pendingQueue$;

之所以使用filter 运算符,是因为通过使用share,响应将发送给所有订阅者。假设你同步调用http.get 5 次,所以share 的主题有5 个新订阅者,最后一个会收到它的响应,但也会收到其他请求的响应。所以使用可以使用filter来给请求正确的响应,在这种情况下,通过比较请求的URL(req.url)和我们从HttpResponse.url得到的URL:

observer.next(new HttpResponse(
  body,
  headers,
  status,
  statusText,
  url: url || undefined,
));

Link for the above snippet.


现在,我们为什么要使用share()

让我们先看一个更简单的例子:

const s = new Subject();

const queue$ = s.pipe(
  mergeAll()
)

function intercept (req) 
  s.next(of(req));
  
  return queue$


// making request 1
intercept( url: 'req 1' ).subscribe();

// making request 2
intercept( url: 'req 2' ).subscribe();

// making request 3
intercept( url: 'req 3' ).subscribe();

此时,主题 s 应该有 3 个订阅者。这是因为当您返回队列时,您会返回 s.pipe(...),而当您订阅 时,它与执行操作相同:

s.pipe(/* ... */).subscribe()

所以,这就是主题最后会有 3 个订阅者的原因。

现在让我们检查相同的 sn-p,但使用 share()

const queue$ = s.pipe(
  mergeAll(),
  share()
);

// making request 1
intercept( url: 'req 1' ).subscribe();

// making request 2
intercept( url: 'req 2' ).subscribe();

// making request 3
intercept( url: 'req 3' ).subscribe();

订阅请求 1 后,share 将创建一个 Subject 实例,并且所有后续订阅者都将属于它,而不是属于 main Subject s。因此,s 将只有一个订阅者。这将确保我们正确实现队列,因为尽管主题 s 只有一个订阅者,它仍然会接受 s.next() 值,其结果将传递给另一个主题(来自 @987654354 的那个) @),它最终会将响应发送给它的所有订阅者。

【讨论】:

感谢您的深入解答。我知道这应该如何工作。喜欢它的简单性,但是当我遵循这些准则时,我得到了类型不匹配。 stackblitz.com/edit/… @matcheek 现在应该编译成功了:stackblitz.com/edit/… 再问一个问题。在重新审视上面的这个stackblits时,我注意到了两件事:1.与SO答案不同,我在拦截调用中看不到“订阅”。 2.所有http查询都得到相同的响应分配。或者至少在我看来是这样。您介意修改您的答案或堆栈闪电战吗? @matcheek 在答案intercept() 被订阅只是为了演示目的。它们会自动以角度订阅,因此您不必担心。关于第二点,你是对的,负责分配响应的逻辑应该驻留在拦截器中,而不是在服务中。让我知道它是否有效,以便我可以更新答案。

以上是关于Angular - http 拦截器 - http 速率限制器 - 滑动窗口的主要内容,如果未能解决你的问题,请参考以下文章

单元测试 Angular 12 HTTP 拦截器 expectOne 失败

Angular 6 - Ionic - 使用拦截器中断 http 请求并返回 json

在 http 请求之前执行的 Angular 拦截器

Angular 4.3 - HTTP 拦截器 - 刷新 JWT 令牌

Angular 5:从拦截器中的 http 响应标头获取授权

Angular HTTP 拦截器自定义预检