并行分发请求
Posted
技术标签:
【中文标题】并行分发请求【英文标题】:Distributing Requests In Parallel 【发布时间】:2021-12-05 14:57:03 【问题描述】:我喜欢这样的场景:
我有一个端点,这个端点会将请求保存在内存中的列表或队列中,它会立即向消费者返回成功响应。这个要求很关键,消费者不应该等待响应,如果需要,它将从不同的端点获取响应。所以,这个端点必须在将请求消息保存到内存后尽快返回。
另一个线程会将这些请求分发到其他端点并将响应也保存在内存中。
到目前为止我做了什么:
我创建了一个控制器 api 来将这些请求保存在内存中。我将它们保存在一个静态请求列表中,如下所示:
public static class RequestList
public static event EventHandler<RequestEventArgs> RequestReceived;
private static List<DistributionRequest> Requests get; set; = new List<DistributionRequest>();
public static int RequestCount get => RequestList.Requests.Count;
public static DistributionRequest Add(DistributionRequest request)
request.RequestId = Guid.NewGuid().ToString();
RequestList.Requests.Add(request);
OnRequestReceived(new RequestEventArgs Request = request );
return request;
public static bool Remove(DistributionRequest request) => Requests.Remove(request);
private static void OnRequestReceived(RequestEventArgs e)
RequestReceived?.Invoke(null, e);
public class RequestEventArgs : EventArgs
public DistributionRequest Request get; set;
另一个类订阅了该静态类中存在的那个事件,我正在创建一个新线程来发出一些后台 Web 请求,以便能够实现我上面提到的 2. 项。
private void RequestList_RequestReceived(object sender, RequestEventArgs e)
_logger.LogInformation($"Request Id: e.Request.RequestId, New request received");
Task.Factory.StartNew(() => Distribute(e.Request));
_logger.LogInformation($"Request Id: e.Request.RequestId, New task created for the new request");
//await Distribute(e.Request);
public async Task<bool> Distribute(DistributionRequest request)
//Some logic running here to send post request to different endpoints
//and to save results in memory
这是我的控制器方法:
[HttpPost]
public IActionResult Post([FromForm] DistributionRequest request)
var response = RequestList.Add(request);
return Ok(new DistributionResponse Succeeded = true, RequestId = response.RequestId );
我尝试了这种方法,但它没有像我预期的那样工作,它应该在几毫秒内返回,因为我没有等待响应,但它似乎在等待某些东西,并且在每个请求之后等待时间增加如下:
我做错了什么?或者你有更好的主意吗?我怎样才能实现我的目标?
【问题讨论】:
对其进行分析。这将向您展示什么功能正在消耗您的所有时间以及问题出在哪里。 一些不相关的东西:您可能希望使用类似ConcurrentBag
而不是List
的东西,以免在并行删除和添加元素时遇到并发问题
【参考方案1】:
根据您的示例代码,我尝试在没有“事件”的情况下实现它。因此我得到了更好的请求时间。我不能说这是否与您的实现或事件本身有关,您必须进行分析。
我是这样做的
请求控制器
就像您在示例中那样。接受请求并将其添加到请求列表中。
[Route("requests")]
public class RequestsController : ControllerBase
private readonly RequestManager _mgr;
public RequestsController(RequestManager mgr)
_mgr = mgr;
[HttpPost]
public IActionResult AddRequest([FromBody] DistributionRequest request)
var item = _mgr.Add(request);
return Accepted(new Succeeded = true, RequestId = item.RequestId );
请求管理器
管理请求列表并将它们转发给某个分发者。
public class RequestManager
private readonly ILogger _logger;
private readonly RequestDistributor _distributor;
public IList<DistributionRequest> Requests get; = new List<DistributionRequest>();
public RequestManager(RequestDistributor distributor, ILogger<RequestManager> logger)
_distributor = distributor;
_logger = logger;
public DistributionRequest Add(DistributionRequest request)
_logger.LogInformation($"Request Id: request.RequestId, New request received");
/// Just add to the list of requests
Requests.Add(request);
/// Create and start a new task to distribute the request
/// forward it to the distributor.
/// Be sure to not add "await" here
Task.Factory.StartNew(() => _distributor.DistributeAsync(request));
_logger.LogInformation($"Request Id: request.RequestId, New task created for the new request");
return request;
RequestDistributor
这里可以实现分发逻辑
public class RequestDistributor
public async Task DistributeAsync(DistributionRequest request)
/// do your distribution here
/// currently just a mocked time range
await Task.Delay(5);
接线
...将所有这些东西添加到您的依赖注入配置中
public void ConfigureServices(IServiceCollection services)
services.AddControllers();
services.AddSingleton<RequestDistributor>();
services.AddSingleton<RequestManager>();
测试
使用此处提供的代码片段,我在不到 10 毫秒的时间内收到了所有请求。
注意
这只是一个示例,尝试始终向您的服务添加接口以使其可测试;)。
【讨论】:
以上是关于并行分发请求的主要内容,如果未能解决你的问题,请参考以下文章
Xcode 7.3 Apple Ad Hoc 分发请求分发清单信息
Android:OkHttp请求分发器 Dispatcher的理解和使用