使用 TPL 数据流的请求/响应模式

Posted

技术标签:

【中文标题】使用 TPL 数据流的请求/响应模式【英文标题】:Request/Response pattern with TPL Dataflow 【发布时间】:2018-10-11 17:12:38 【问题描述】:

我们在使用 TPL 数据流库时需要一个请求/响应模式。我们的问题是我们有一个调用依赖服务的 .NET 核心 API。依赖服务限制并发请求。我们的 API 不限制并发请求;因此,我们一次可以收到数千个请求。在这种情况下,依赖服务将在达到其限制后拒绝请求。因此,我们实现了BufferBlock<T>TransformBlock<TIn, TOut>。性能稳定,效果很好。我们测试了我们的 API 前端,有 1000 个用户发出 100 个请求/秒,0 个问题。缓冲区块缓冲请求,转换块并行执行我们所需数量的请求。依赖服务接收我们的请求并做出响应。我们在转换块操作中返回该响应,一切都很好。我们的问题是缓冲区块和转换块断开连接,这意味着请求/响应不同步。我们遇到了一个请求将收到另一个请求者的响应的问题(请参阅下面的代码)。

具体到下面的代码,我们的问题出在GetContent方法上。该方法是从我们 API 中的服务层调用的,该服务层最终是从我们的控制器调用的。下面的代码和服务层是单例的。缓冲区的SendAsync 与转换块ReceiveAsync 断开连接,因此返回任意响应而不一定是发出的请求。

所以,我们的问题是:有没有办法使用数据流块来关联请求/响应?最终目标是请求进入我们的 API,发送到依赖服务,然后返回给客户端。我们的数据流实现代码如下。

public class HttpClientWrapper : IHttpClientManager

    private readonly IConfiguration _configuration;
    private readonly ITokenService _tokenService;
    private HttpClient _client;

    private BufferBlock<string> _bufferBlock;
    private TransformBlock<string, JObject> _actionBlock;

    public HttpClientWrapper(IConfiguration configuration, ITokenService tokenService)
    
        _configuration = configuration;
        _tokenService = tokenService;

        _bufferBlock = new BufferBlock<string>();

        var executionDataFlowBlockOptions = new ExecutionDataflowBlockOptions
        
            MaxDegreeOfParallelism = 10
        ;

        var dataFlowLinkOptions = new DataflowLinkOptions
        
            PropagateCompletion = true
        ;

        _actionBlock = new TransformBlock<string, JObject>(t => ProcessRequest(t),
            executionDataFlowBlockOptions);

        _bufferBlock.LinkTo(_actionBlock, dataFlowLinkOptions);
    

    public void Connect()
    
        _client = new HttpClient();

        _client.DefaultRequestHeaders.Add("x-ms-client-application-name",
            "ourappname");
    

    public async Task<JObject> GetContent(string request)
    
        await _bufferBlock.SendAsync(request);

        var result = await _actionBlock.ReceiveAsync();

        return result;
    

    private async Task<JObject> ProcessRequest(string request)
    
        if (_client == null)
        
            Connect();
        

        try
        
            var accessToken = await _tokenService.GetTokenAsync(_configuration);

            var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post,
                new Uri($"https://_configuration.Uri"));

            // add the headers
            httpRequestMessage.Headers.Add("Authorization", $"Bearer accessToken");
            // add the request body
            httpRequestMessage.Content = new StringContent(request, Encoding.UTF8,
                "application/json");

            var postRequest = await _client.SendAsync(httpRequestMessage);

            var response = await postRequest.Content.ReadAsStringAsync();

            return JsonConvert.DeserializeObject<JObject>(response);
        
        catch (Exception ex)
        
            // log error

            return new JObject();
        
    

【问题讨论】:

【参考方案1】:

您需要做的是用 id 标记每个传入的项目,以便您可以将数据输入与结果输出相关联。以下是如何做到这一点的示例:

namespace ConcurrentFlows.DataflowJobs 
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;

    /// <summary>
    /// A generic interface defining that:
    /// for a specified input type => an awaitable result is produced.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public interface IJobManager<TInput, TOutput> 
        Task<TOutput> SubmitRequest(TInput data);
    

    /// <summary>
    /// A TPL-Dataflow based job manager.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> 

        /// <summary>
        /// It is anticipated that jobHandler is an injected
        /// singleton instance of a Dataflow based 'calculator', though this implementation
        /// does not depend on it being a singleton.
        /// </summary>
        /// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
        public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) 
            if (jobHandler == null)  throw new ArgumentException("Argument cannot be null.", "jobHandler"); 

            this.JobHandler = JobHandler;
            if (!alreadyLinked) 
                JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions()  PropagateCompletion = true );
                alreadyLinked = true;
            
        

        private static bool alreadyLinked = false;            

        /// <summary>
        /// Submits the request to the JobHandler and asynchronously awaits the result.
        /// </summary>
        /// <param name="data">The input data to be processd.</param>
        /// <returns></returns>
        public async Task<TOutput> SubmitRequest(TInput data) 
            var taggedData = TagInputData(data);
            var job = CreateJob(taggedData);
            Jobs.TryAdd(job.Key, job.Value);
            await JobHandler.SendAsync(taggedData);
            return await job.Value.Task;
        

        private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs 
            get;
         = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();

        private static ExecutionDataflowBlockOptions Options 
            get;
         = GetResultHandlerOptions();

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler 
            get;
         = CreateReplyHandler(Options);

        private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler 
            get;
        

        private KeyValuePair<Guid, TInput> TagInputData(TInput data) 
            var id = Guid.NewGuid();
            return new KeyValuePair<Guid, TInput>(id, data);
        

        private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) 
            var id = taggedData.Key;
            var jobCompletionSource = new TaskCompletionSource<TOutput>();
            return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
        

        private static ExecutionDataflowBlockOptions GetResultHandlerOptions() 
            return new ExecutionDataflowBlockOptions() 
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 1000
            ;
        

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) 
            return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => 
                RecieveOutput(result);
            , options);
        

        private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) 
            var jobId = result.Key;
            TaskCompletionSource<TOutput> jobCompletionSource;
            if (!Jobs.TryRemove(jobId, out jobCompletionSource)) 
                throw new InvalidOperationException($"The jobId: jobId was not found.");
            
            var resultValue = result.Value;
            jobCompletionSource.SetResult(resultValue);            
        
    

另请参阅this answer 以供参考。

【讨论】:

【参考方案2】:

对于 TPL Dataflow 库来说,做一个简单的节流并不是一个特别诱人的用例,而使用 SemaphoreSlim 似乎更简单、更有吸引力。但是,如果您想要更多功能,例如强制每个请求的最短持续时间,或者有办法等待所有未决请求完成,那么 TPL 数据流可以提供SemaphoreSlim 无法提供的东西。基本思想是避免将裸输入值传递给块,然后尝试将它们与生成的结果相关联。根据请求立即创建任务,将任务发送到ActionBlock&lt;Task&gt;,然后使用指定的MaxDegreeOfParallelism 让块激活和await 异步执行这些任务,会更加安全。这样,输入值及其结果将永远明确地绑定在一起。

public class ThrottledExecution<T>

    private readonly ActionBlock<Task<Task<T>>> _actionBlock;
    private readonly CancellationToken _cancellationToken;

    public ThrottledExecution(int concurrencyLevel, int minDurationMilliseconds = 0,
        CancellationToken cancellationToken = default)
    
        if (minDurationMilliseconds < 0) throw new ArgumentOutOfRangeException();
        _actionBlock = new ActionBlock<Task<Task<T>>>(async task =>
        
            try
            
                var delay = Task.Delay(minDurationMilliseconds, cancellationToken);
                task.RunSynchronously();
                await task.Unwrap().ConfigureAwait(false);
                await delay.ConfigureAwait(false);
            
            catch   // Ignore exceptions (errors are propagated through the task)
        , new ExecutionDataflowBlockOptions()
        
            MaxDegreeOfParallelism = concurrencyLevel,
            CancellationToken = cancellationToken,
        );
        _cancellationToken = cancellationToken;
    

    public Task<T> Run(Func<Task<T>> function)
    
        // Create a cold task (the function will be invoked later)
        var task = new Task<Task<T>>(function, _cancellationToken);
        var accepted = _actionBlock.Post(task);
        if (!accepted)
        
            _cancellationToken.ThrowIfCancellationRequested();
            throw new InvalidOperationException(
                "The component has been marked as complete.");
        
        return task.Unwrap();
    

    public void Complete() => _actionBlock.Complete();
    public Task Completion => _actionBlock.Completion;

使用示例:

private ThrottledExecution<JObject> throttledExecution
    = new ThrottledExecution<JObject>(concurrencyLevel: 10);

public Task<JObject> GetContent(string request)

    return throttledExecution.Run(() => ProcessRequest(request));

【讨论】:

【参考方案3】:

感谢 JSteward 提供的答案。他的方法是完全可以接受的;然而,我最终通过使用 SemaphoreSlim 来做到这一点。 SemaphoreSlim 提供了两个功能,使其成为强大的解决方案。首先,它提供了一个构造函数重载,您可以在其中发送计数。此计数是指能够通过信号量等待机制的并发项目数。等待机制由称为 WaitAsync 的方法提供。使用下面的方法,其中 Worker 类作为 Singleton,并发请求进来,一次限制为 10 个执行 http 请求,并且响应全部返回到正确的请求。因此,实现可能如下所示:

public class Worker: IWorker

    private readonly IHttpClientManager _httpClient;
    private readonly ITokenService _tokenService;

    private readonly SemaphoreSlim _semaphore;

    public Worker(IHttpClientManager httpClient, ITokenService tokenService)
    
        _httpClient = httpClient;
        _tokenService = tokenService;

        // we want to limit the number of items here
        _semaphore = new SemaphoreSlim(10);
    

    public async Task<JObject> ProcessRequestAsync(string request, string route)
    
        try
        
            var accessToken = await _tokenService.GetTokenAsync(
                _timeSeriesConfiguration.TenantId,
                _timeSeriesConfiguration.ClientId,
                _timeSeriesConfiguration.ClientSecret);

            var cancellationToken = new CancellationTokenSource();

            cancellationToken.CancelAfter(30000);

            await _semaphore.WaitAsync(cancellationToken.Token);
            var httpResponseMessage = await _httpClient.SendAsync(new HttpClientRequest
            
                Method = HttpMethod.Post,
                Uri = $"https://someuri/someroute",
                Token = accessToken,
                Content = request
            );

            var response = await httpResponseMessage.Content.ReadAsStringAsync();

            return response;
        
        catch (Exception ex)
        
            // do some logging

            throw;
        
        finally
        
            _semaphore.Release();
        
    

【讨论】:

这个实现是有问题的。 _tokenService.GetTokenAsync 调用期间的异常将释放信号量而不首先获取它,从而导致并发级别增加,这将打开依赖服务拒绝的窗口。 _semaphore.WaitAsync 命令的正确位置是在进入 try/finally 块之前。

以上是关于使用 TPL 数据流的请求/响应模式的主要内容,如果未能解决你的问题,请参考以下文章

使用 TPL 数据流对持续时间或阈值进行批处理

带有循环链接的 TPL 块?

ajax数据请求

TPL Dataflow ,完成一个 Block ,重新创建一个 BLock

当发出多个 ajax 请求时,是不是有一种“好的模式”来跟踪响应与哪些数据一致?

请求 - 带有异步 .Net 套接字库的响应模式