并行分发请求

Posted

技术标签:

【中文标题】并行分发请求【英文标题】:Distributing Requests In Parallel 【发布时间】:2021-12-05 14:57:03 【问题描述】:

我喜欢这样的场景:

    我有一个端点,这个端点会将请求保存在内存中的列表或队列中,它会立即向消费者返回成功响应。这个要求很关键,消费者不应该等待响应,如果需要,它将从不同的端点获取响应。所以,这个端点必须在将请求消息保存到内存后尽快返回。

    另一个线程会将这些请求分发到其他端点并将响应也保存在内存中。

到目前为止我做了什么:

我创建了一个控制器 api 来将这些请求保存在内存中。我将它们保存在一个静态请求列表中,如下所示:

    public static class RequestList
    
        public static event EventHandler<RequestEventArgs> RequestReceived;

        private static List<DistributionRequest> Requests  get; set;  = new List<DistributionRequest>();

        public static int RequestCount  get => RequestList.Requests.Count; 

        public static DistributionRequest Add(DistributionRequest request)
        
            request.RequestId = Guid.NewGuid().ToString();
            RequestList.Requests.Add(request);
            OnRequestReceived(new RequestEventArgs  Request = request );
            return request;
        

        public static bool Remove(DistributionRequest request) => Requests.Remove(request);

        private static void OnRequestReceived(RequestEventArgs e)
        
            RequestReceived?.Invoke(null, e);
        
    

    public class RequestEventArgs : EventArgs
    
        public DistributionRequest Request  get; set; 
    

另一个类订阅了该静态类中存在的那个事件,我正在创建一个新线程来发出一些后台 Web 请求,以便能够实现我上面提到的 2. 项。

        private void RequestList_RequestReceived(object sender, RequestEventArgs e)
        
            _logger.LogInformation($"Request Id: e.Request.RequestId, New request received");
            Task.Factory.StartNew(() => Distribute(e.Request));
            _logger.LogInformation($"Request Id: e.Request.RequestId, New task created for the new request");
            //await Distribute(e.Request);
        

        public async Task<bool> Distribute(DistributionRequest request)
        

            //Some logic running here to send post request to different endpoints 
            //and to save results in memory
        

这是我的控制器方法:

        [HttpPost]
        public IActionResult Post([FromForm] DistributionRequest request)
        
            var response = RequestList.Add(request);
            return Ok(new DistributionResponse  Succeeded = true, RequestId = response.RequestId );
        

我尝试了这种方法,但它没有像我预期的那样工作,它应该在几毫秒内返回,因为我没有等待响应,但它似乎在等待某些东西,并且在每个请求之后等待时间增加如下:

我做错了什么?或者你有更好的主意吗?我怎样才能实现我的目标?

【问题讨论】:

对其进行分析。这将向您展示什么功能正在消耗您的所有时间以及问题出在哪里。 一些不相关的东西:您可能希望使用类似ConcurrentBag 而不是List 的东西,以免在并行删除和添加元素时遇到并发问题 【参考方案1】:

根据您的示例代码,我尝试在没有“事件”的情况下实现它。因此我得到了更好的请求时间。我不能说这是否与您的实现或事件本身有关,您必须进行分析。

我是这样做的

请求控制器

就像您在示例中那样。接受请求并将其添加到请求列表中。

[Route("requests")]
    public class RequestsController : ControllerBase
    
        private readonly RequestManager _mgr;

        public RequestsController(RequestManager mgr)
        
            _mgr = mgr;
        

        [HttpPost]
        public IActionResult AddRequest([FromBody] DistributionRequest request)
        
            var item = _mgr.Add(request);
            return Accepted(new  Succeeded = true, RequestId = item.RequestId );
        
    

请求管理器

管理请求列表并将它们转发给某个分发者。

public class RequestManager
    
        private readonly ILogger _logger;
        private readonly RequestDistributor _distributor;

        public IList<DistributionRequest> Requests  get;  = new List<DistributionRequest>();

        public RequestManager(RequestDistributor distributor, ILogger<RequestManager> logger)
        
            _distributor = distributor;
            _logger = logger;
        

        public DistributionRequest Add(DistributionRequest request)
        
            _logger.LogInformation($"Request Id: request.RequestId, New request received");
            /// Just add to the list of requests
            Requests.Add(request);
            /// Create and start a new task to distribute the request 
            /// forward it to the distributor.
            /// Be sure to not add "await" here
            Task.Factory.StartNew(() => _distributor.DistributeAsync(request));
            _logger.LogInformation($"Request Id: request.RequestId, New task created for the new request");

            return request;
        
    

RequestDistributor

这里可以实现分发逻辑

public class RequestDistributor
    
        public async Task DistributeAsync(DistributionRequest request)
        
            /// do your distribution here
            /// currently just a mocked time range
            await Task.Delay(5);
        
    

接线

...将所有这些东西添加到您的依赖注入配置中

public void ConfigureServices(IServiceCollection services)
        
            services.AddControllers();
            services.AddSingleton<RequestDistributor>();
            services.AddSingleton<RequestManager>();
        

测试

使用此处提供的代码片段,我在不到 10 毫秒的时间内收到了所有请求。

注意

这只是一个示例,尝试始终向您的服务添加接口以使其可测试;)。

【讨论】:

以上是关于并行分发请求的主要内容,如果未能解决你的问题,请参考以下文章

Nginx简单的请求分发跟负载均衡----分发到多台机器

Xcode 7.3 Apple Ad Hoc 分发请求分发清单信息

mock 请求分发

Android:OkHttp请求分发器 Dispatcher的理解和使用

Servlet 中 RequestDispacher 请求与分发

请求开发人员证书的分发配置文件