在 C# 中可通过背压观察
Posted
技术标签:
【中文标题】在 C# 中可通过背压观察【英文标题】:Observable with backpressure in C# 【发布时间】:2021-12-13 04:57:10 【问题描述】:C# rx 中有没有办法处理背压? 我正在尝试从分页查询的结果中调用 Web api。这个 web api 非常脆弱,我需要不超过 3 个并发调用,所以,程序应该是这样的:
-
从 db 获取页面
调用 web api,页面上每条记录最多三个并发调用
将结果保存回数据库
获取另一个页面并重复,直到没有更多结果。
我并没有真正得到我所追求的序列,基本上数据库会获取所有记录,无论它们是否可以处理。
我尝试了各种方法,包括调整 ObserveOn
运算符、实现信号量以及其他一些事情。我可以得到一些指导来实现这样的东西吗?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Castle.Core.Internal;
using Xunit;
using Xunit.Abstractions;
namespace ProductValidation.CLI.Tests.Services
public class Example
private readonly ITestOutputHelper output;
public Example(ITestOutputHelper output)
this.output = output;
[Fact]
public async Task RunsObservableToCompletion()
var repo = new Repository(output);
var client = new ServiceClient(output);
var results = repo.FetchRecords()
.Select(x => client.FetchMoreInformation(x).ToObservable())
.Merge(1)
.Do(async x => await repo.Save(x));
await results.LastOrDefaultAsync();
public class Repository
private readonly ITestOutputHelper output;
public Repository(ITestOutputHelper output)
this.output = output;
public IObservable<int> FetchRecords()
return Observable.Create<int>(async (observer) =>
var page = 1;
var products = await FetchPage(page);
while (!products.IsNullOrEmpty())
foreach (var product in products)
observer.OnNext(product);
page += 1;
products = await FetchPage(page);
observer.OnCompleted();
)
.ObserveOn(SynchronizationContext.Current);
private async Task<IEnumerable<int>> FetchPage(int page)
// Simulate fetching a paged query.
await Task.Delay(500).ToObservable().ObserveOn(new TaskPoolScheduler(new TaskFactory()));
output.WriteLine("Fetching page 0", page);
if (page >= 4) return Enumerable.Empty<int>();
return Enumerable.Range(1, 3).Select(_ => page);
public async Task Save(string id)
await Task.Delay(50); //Simulates latency
public class ServiceClient
private readonly ITestOutputHelper output;
private readonly SemaphoreSlim semaphore;
public ServiceClient(ITestOutputHelper output)
this.output = output;
this.semaphore = new SemaphoreSlim(2);
public async Task<string> FetchMoreInformation(int id)
try
output.WriteLine("Calling the web client for 0", id);
await semaphore.WaitAsync(); // Protection for the webapi not sending too many calls
await Task.Delay(1000); //Simulates latency
return id.ToString();
finally
semaphore.Release();
【问题讨论】:
【参考方案1】:Rx 不支持背压,因此无法以与处理记录相同的速度从数据库中获取记录。也许您可以使用Subject<Unit>
作为信号机制,每次处理记录时推送一个值,并设计一种方法在生产站点使用这些信号,以便在收到信号时从数据库中获取新记录。但这将是一个混乱且惯用的解决方案。 TPL Dataflow 是比 Rx 更适合进行此类工作的工具。它原生支持BoundedCapacity
配置选项。
关于您发布的代码的一些 cmets,与背压问题没有直接关系:
带有maxConcurrent
参数的Merge
运算符对内部序列的并发订阅施加了限制,但是如果内部序列已经启动并运行,这将不起作用。所以你必须确保内部序列是冷的,一个方便的方法是Defer
运算符:
.Select(x => Observable.Defer(() =>
client.FetchMoreInformation(x).ToObservable()))
将异步方法转换为延迟可观察序列的更常见方法是FromAsync
运算符:
.Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))
顺便说一句,Do
操作员不理解异步委托,所以改为:
.Do(async x => await repo.Save(x));
...创建async void
lambdas,最好这样做:
.Select(x => Observable.FromAsync(() => repo.Save(x)))
.Merge(1);
更新:这是一个示例,说明如何使用 SemaphoreSlim
在 Rx 中实现背压:
const int boundedCapacity = 10;
using var semaphore = new SemaphoreSlim(boundedCapacity, boundedCapacity);
IObservable<int> results = repo
.FetchRecords(semaphore)
.Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))
.Merge(1)
.Select(x => Observable.FromAsync(() => repo.Save(x)))
.Merge(1)
.Do(_ => semaphore.Release());
await results.DefaultIfEmpty();
在FetchRecords
方法内部:
//...
await semaphore.WaitAsync();
observer.OnNext(product);
//...
这是一个脆弱的解决方案,因为它依赖于通过管道传播所有元素。如果将来您决定在管道中包含过滤或限制,那么WaitAsync
和Release
之间的一对一关系将被违反,最可能的结果是管道死锁。
【讨论】:
感谢您将我指向 TPL,我会认真阅读。然而,这是我过去使用 rx 来处理资源的。 Rx 中吸引人的地方是什么?我不确定该工具的用途,正确的用例是什么。 @Roger 看看这个:When is Rx appropriate? Rx 非常擅长处理一系列事件。它允许编写对这些事件做出反应的代码,在过滤、节流、投影、组合它们等之后。如果您处于需要干扰事件源的情况,那么反应范式开始破裂。 @Roger 我添加了一个借助SemaphoreSlim
实现背压的示例。以上是关于在 C# 中可通过背压观察的主要内容,如果未能解决你的问题,请参考以下文章