异步接受传入请求

Posted

技术标签:

【中文标题】异步接受传入请求【英文标题】:Accepting incoming requests asynchronously 【发布时间】:2014-03-11 12:21:26 【问题描述】:

我已经确定了我的 TCP 应用程序中的一个瓶颈,我为了这个问题而对其进行了简化。

我有一个MyClient 类,它代表客户端何时连接;我还有一个MyWrapper 类,它代表满足某些条件的客户端。如果MyClient满足某些条件,则它有资格成为包装器。

我想公开一个允许调用者等待MyWrapper的方法,该方法应该处理无效MyClients的协商和拒绝:

public static async Task StartAccepting(CancellationToken token)

    while (!token.IsCancellationRequested)
    
        var wrapper = await AcceptWrapperAsync(token);

        HandleWrapperAsync(wrapper);
    

因此AcceptWrapperAsync 等待一个有效的包装器,而HandleWrapperAsync 异步处理包装器而不阻塞线程,因此AcceptWrapperAsync 可以尽快恢复工作。

该方法在内部的工作原理是这样的:

public static async Task<MyWrapper> AcceptWrapperAsync(CancellationToken token)

    while (!token.IsCancellationRequested)
    
        var client = await AcceptClientAsync();
        if (IsClientWrappable(client))
            return new MyWrapper(client);
    
    return null;


public static async Task<MyClient> AcceptClientAsync()

    await Task.Delay(1000);
    return new MyClient();


private static Boolean IsClientWrappable(MyClient client)

    Thread.Sleep(500);
    return true;

这段代码模拟每秒有一个客户端连接,并且需要半秒来检查连接是否适合包装器。 AcceptWrapperAsync 循环直到生成有效的包装器,然后返回。

这种运作良好的方法存在缺陷。在IsClientWrappable 执行期间,不能接受更多的客户端,当大量客户端同时尝试连接时会造成瓶颈。恐怕在现实生活中,如果服务器在连接了很多客户端的情况下出现故障,那么上升不会很好,因为所有客户端都会同时尝试连接。我知道同时连接所有这些非常困难,但我想加快连接过程。

使IsClientWrappable异步,只会确保执行线程在协商完成之前不会被阻塞,但无论如何都会阻塞执行流程。

如何改进这种方法以不断接受新客户,但仍然能够等待使用 AcceptWrapperAsync 的包装器?

【问题讨论】:

这似乎是 Reactive Extensions (Rx) 的一个很好的用例 Rx 对于这将是一个矫枉过正。看来Dataflow更有意义。 使用 RX,您将数据推送到订阅者,而使用 DataFlow,您将数据推送到消费者。 :) 好吧,看来RX远不止这些。数据流只是异步工作的类的集合:P 【参考方案1】:
//this loop must never be blocked
while (!token.IsCancellationRequested)

    var client = await AcceptClientAsync();
    HandleClientAsync(client); //must not block


Task HandleClientAsync(Client client) 
    if (await IsClientWrappableAsync(client)) //make async as well, don't block
        await HandleWrapperAsync(new MyWrapper(client));

通过这种方式,您可以将 IsClientWrappable 逻辑移出接受循环并进入后台异步工作流。

如果你不想让IsClientWrappable 非阻塞,只需用Task.Run 包裹它。 HandleClientAsync 不阻塞是很重要的,这样它的调用者也不会阻塞。

【讨论】:

我不能这样做,正如我所说,我必须提供一个等待的AcceptWrapperAsync 好的,在这种情况下我会研究 Rx。看起来您有一个传入客户端流,需要过滤并转换为包装客户端。一切都是异步的。 Rx 可以做到这一点,但我对此还不够熟悉。【参考方案2】:

TPL 数据流助您一臂之力。我创建了一个带有两个队列的“生产者/消费者”对象:

    接受来自“producer”的输入并将其存储在“in”队列中。 从“in”队列中读取的内部异步任务,并以给定的最大并行度并行处理输入。 之后将已处理的项目放入“out”队列中。结果或异常。 接受消费者await 一个项目。然后可以检查处理是否成功。

我已经做了一些测试,它似乎工作正常,但我想做更多的测试:

public sealed class ProcessingResult<TOut> 
    where TOut : class

    public TOut Result  get; internal set; 
    public Exception Error  get; internal set; 


public abstract class ProcessingBufferBlock<TIn,TOut> 
    where TIn:class 
    where TOut:class

    readonly BufferBlock<TIn> _in;
    readonly BufferBlock<ProcessingResult<TOut>> _out;
    readonly CancellationToken _cancellation;
    readonly SemaphoreSlim _semaphore;

    public ProcessingBufferBlock(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
    
        _cancellation = cancellation;
        _semaphore = new SemaphoreSlim(degreeOfParalellism);
        var options = new DataflowBlockOptions()  BoundedCapacity = boundedCapacity, CancellationToken = cancellation ;
        _in = new BufferBlock<TIn>(options);
        _out = new BufferBlock<ProcessingResult<TOut>>(options);
        StartReadingAsync();
    

    private async Task StartReadingAsync()
    
        await Task.Yield();
        while (!_cancellation.IsCancellationRequested)
        
            var incoming = await _in.ReceiveAsync(_cancellation);
            ProcessThroughGateAsync(incoming);
        
    

    private async Task ProcessThroughGateAsync(TIn input)
    
        _semaphore.Wait(_cancellation);
        Exception error=null;
        TOut result=null;
        try
        
            result = await ProcessAsync(input);                   
        
        catch (Exception ex)
        
            error = ex;
        
        finally
        
            if(result!=null || error!=null)
                _out.Post(new ProcessingResult<TOut>()  Error = error, Result = result );
            _semaphore.Release(1);
        
    

    protected abstract Task<TOut> ProcessAsync(TIn input);

    public void Post(TIn item)
    
        _in.Post(item);
    

    public Task<ProcessingResult<TOut>> ReceiveAsync()
    
        return _out.ReceiveAsync();
    

所以我在 OP 上使用的示例是这样的:

public class WrapperProcessingQueue : ProcessingBufferBlock<MyClient, MyWrapper>

    public WrapperProcessingQueue(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
        : base(boundedCapacity, degreeOfParalellism, cancellation)
     

    protected override async Task<MyWrapper> ProcessAsync(MyClient input)
    
        await Task.Delay(5000);
        if (input.Id % 3 == 0)
            return null;
        return new MyWrapper(input);
    

然后我可以尽快将MyClient 对象添加到该队列中,它们将被并行处理,消费者将等待通过过滤器的对象。

正如我所说,我想做更多的测试,但我们非常欢迎任何反馈。

干杯。

【讨论】:

以上是关于异步接受传入请求的主要内容,如果未能解决你的问题,请参考以下文章

如何异步处理 WCF 的传入调用?

关于如何接受异步ajax请求返回前台的数据

使用 async/await 的异步 TcpListener 的正确方法

在 MFC 中使用异步过程调用中断接受 winsock 调用

异步方式

Redux工具包 - Redux Toolkit的异步操作(发送网络请求)