异步接受传入请求
Posted
技术标签:
【中文标题】异步接受传入请求【英文标题】:Accepting incoming requests asynchronously 【发布时间】:2014-03-11 12:21:26 【问题描述】:我已经确定了我的 TCP 应用程序中的一个瓶颈,我为了这个问题而对其进行了简化。
我有一个MyClient
类,它代表客户端何时连接;我还有一个MyWrapper
类,它代表满足某些条件的客户端。如果MyClient
满足某些条件,则它有资格成为包装器。
我想公开一个允许调用者等待MyWrapper
的方法,该方法应该处理无效MyClients
的协商和拒绝:
public static async Task StartAccepting(CancellationToken token)
while (!token.IsCancellationRequested)
var wrapper = await AcceptWrapperAsync(token);
HandleWrapperAsync(wrapper);
因此AcceptWrapperAsync
等待一个有效的包装器,而HandleWrapperAsync
异步处理包装器而不阻塞线程,因此AcceptWrapperAsync
可以尽快恢复工作。
该方法在内部的工作原理是这样的:
public static async Task<MyWrapper> AcceptWrapperAsync(CancellationToken token)
while (!token.IsCancellationRequested)
var client = await AcceptClientAsync();
if (IsClientWrappable(client))
return new MyWrapper(client);
return null;
public static async Task<MyClient> AcceptClientAsync()
await Task.Delay(1000);
return new MyClient();
private static Boolean IsClientWrappable(MyClient client)
Thread.Sleep(500);
return true;
这段代码模拟每秒有一个客户端连接,并且需要半秒来检查连接是否适合包装器。 AcceptWrapperAsync
循环直到生成有效的包装器,然后返回。
这种运作良好的方法存在缺陷。在IsClientWrappable
执行期间,不能接受更多的客户端,当大量客户端同时尝试连接时会造成瓶颈。恐怕在现实生活中,如果服务器在连接了很多客户端的情况下出现故障,那么上升不会很好,因为所有客户端都会同时尝试连接。我知道同时连接所有这些非常困难,但我想加快连接过程。
使IsClientWrappable
异步,只会确保执行线程在协商完成之前不会被阻塞,但无论如何都会阻塞执行流程。
如何改进这种方法以不断接受新客户,但仍然能够等待使用 AcceptWrapperAsync
的包装器?
【问题讨论】:
这似乎是 Reactive Extensions (Rx) 的一个很好的用例 Rx 对于这将是一个矫枉过正。看来Dataflow更有意义。 使用 RX,您将数据推送到订阅者,而使用 DataFlow,您将数据推送到消费者。 :) 好吧,看来RX远不止这些。数据流只是异步工作的类的集合:P 【参考方案1】://this loop must never be blocked
while (!token.IsCancellationRequested)
var client = await AcceptClientAsync();
HandleClientAsync(client); //must not block
Task HandleClientAsync(Client client)
if (await IsClientWrappableAsync(client)) //make async as well, don't block
await HandleWrapperAsync(new MyWrapper(client));
通过这种方式,您可以将 IsClientWrappable
逻辑移出接受循环并进入后台异步工作流。
如果你不想让IsClientWrappable
非阻塞,只需用Task.Run
包裹它。 HandleClientAsync
不阻塞是很重要的,这样它的调用者也不会阻塞。
【讨论】:
我不能这样做,正如我所说,我必须提供一个等待的AcceptWrapperAsync
好的,在这种情况下我会研究 Rx。看起来您有一个传入客户端流,需要过滤并转换为包装客户端。一切都是异步的。 Rx 可以做到这一点,但我对此还不够熟悉。【参考方案2】:
TPL 数据流助您一臂之力。我创建了一个带有两个队列的“生产者/消费者”对象:
-
接受来自“producer”的输入并将其存储在“in”队列中。
从“in”队列中读取的内部异步任务,并以给定的最大并行度并行处理输入。
之后将已处理的项目放入“out”队列中。结果或异常。
接受消费者
await
一个项目。然后可以检查处理是否成功。
我已经做了一些测试,它似乎工作正常,但我想做更多的测试:
public sealed class ProcessingResult<TOut>
where TOut : class
public TOut Result get; internal set;
public Exception Error get; internal set;
public abstract class ProcessingBufferBlock<TIn,TOut>
where TIn:class
where TOut:class
readonly BufferBlock<TIn> _in;
readonly BufferBlock<ProcessingResult<TOut>> _out;
readonly CancellationToken _cancellation;
readonly SemaphoreSlim _semaphore;
public ProcessingBufferBlock(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
_cancellation = cancellation;
_semaphore = new SemaphoreSlim(degreeOfParalellism);
var options = new DataflowBlockOptions() BoundedCapacity = boundedCapacity, CancellationToken = cancellation ;
_in = new BufferBlock<TIn>(options);
_out = new BufferBlock<ProcessingResult<TOut>>(options);
StartReadingAsync();
private async Task StartReadingAsync()
await Task.Yield();
while (!_cancellation.IsCancellationRequested)
var incoming = await _in.ReceiveAsync(_cancellation);
ProcessThroughGateAsync(incoming);
private async Task ProcessThroughGateAsync(TIn input)
_semaphore.Wait(_cancellation);
Exception error=null;
TOut result=null;
try
result = await ProcessAsync(input);
catch (Exception ex)
error = ex;
finally
if(result!=null || error!=null)
_out.Post(new ProcessingResult<TOut>() Error = error, Result = result );
_semaphore.Release(1);
protected abstract Task<TOut> ProcessAsync(TIn input);
public void Post(TIn item)
_in.Post(item);
public Task<ProcessingResult<TOut>> ReceiveAsync()
return _out.ReceiveAsync();
所以我在 OP 上使用的示例是这样的:
public class WrapperProcessingQueue : ProcessingBufferBlock<MyClient, MyWrapper>
public WrapperProcessingQueue(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
: base(boundedCapacity, degreeOfParalellism, cancellation)
protected override async Task<MyWrapper> ProcessAsync(MyClient input)
await Task.Delay(5000);
if (input.Id % 3 == 0)
return null;
return new MyWrapper(input);
然后我可以尽快将MyClient
对象添加到该队列中,它们将被并行处理,消费者将等待通过过滤器的对象。
正如我所说,我想做更多的测试,但我们非常欢迎任何反馈。
干杯。
【讨论】:
以上是关于异步接受传入请求的主要内容,如果未能解决你的问题,请参考以下文章
使用 async/await 的异步 TcpListener 的正确方法