Azure App Service 和 .NET Core 3.1 中长时间运行计算的适当解决方案?
Posted
技术标签:
【中文标题】Azure App Service 和 .NET Core 3.1 中长时间运行计算的适当解决方案?【英文标题】:Appropriate solution for long running computations in Azure App Service and .NET Core 3.1? 【发布时间】:2020-12-01 18:59:14 【问题描述】:对于不需要数据库且无需 IO 到此应用程序之外的任何内容的应用程序中的 Azure App Service 和 .NET Core 3.1 中的长时间运行计算,什么是合适的解决方案?这是一个计算任务。
具体来说,以下是不可靠的,需要解决方案。
[Route("service")]
[HttpPost]
public Outbound Post(Inbound inbound)
Debug.Assert(inbound.Message.Equals("Hello server."));
Outbound outbound = new Outbound();
long Billion = 1000000000;
for (long i = 0; i < 33 * Billion; i++) // 230 seconds
;
outbound.Message = String.Format("The server processed inbound object.");
return outbound;
这有时会向HttpClient
返回一个空对象(未显示)。较小的工作量总是会成功。例如 30 亿次迭代总是成功的。更大的数字会很好,特别是需要 2400 亿。
我认为,在 2020 年,使用 .NET Core 的 Azure 应用服务的合理目标可能是在 8 个子线程的帮助下将父线程数增加到 2400 亿个,因此每个子线程数达到 300 亿个,并且父线程将一个将 8 M 字节入站对象转换为每个子项入站的较小对象。每个孩子收到一个 1 M 字节的入站数据,并返回给父母一个 1 M 字节的出站数据。父级将结果重新组合成一个 8 M 字节的出站。
显然,经过的时间将是单线程实现所需时间的 12.5%、1/8 或八分之一。与计算时间相比,切割和重新组装对象的时间很小。我假设传输对象的时间与计算时间相比非常小,因此 12.5% 的预期大致准确。
如果我能得到 4 或 8 个内核就好了。如果我能得到一个核心周期的 50% 的线程,那么我可能需要 8 或 16 个线程。如果每个线程给我 33% 的核心周期,那么我需要 12 或 24 个线程。
我正在考虑 BackgroundService
课程,但我正在寻找确认这是正确的方法。微软说...
BackgroundService is a base class for implementing a long running IHostedService.
显然,如果某些东西运行时间很长,最好通过System.Threading
使用多个内核使其更快完成,但这个documentation 似乎只在通过System.Threading.Timer
启动任务的上下文中提到System.Threading
。我的示例代码显示我的应用程序中不需要计时器。 HTTP POST 将作为工作的机会。通常我会使用System.Threading.Thread
来实例化多个对象以使用多个内核。我发现,在需要很长时间的工作解决方案的上下文中,没有提及多核是一个明显的遗漏,但可能有某些原因 Azure App Service 不处理这个问题。也许我只是无法在教程和文档中找到它。
任务的启动是图示的 HTTP POST 控制器。假设最长的工作需要 10 分钟。 HTTP 客户端(未显示)将超时限制设置为 1000 秒,这远远超过 10 分钟(600 秒),以便有安全边际。 HttpClient.Timeout
是相关属性。目前我假设 HTTP 超时是一个真正的限制;而不是某种非约束性(假限制),这样一些其他约束会导致用户等待 9 分钟并收到错误消息。真正的绑定限制是我可以说“但对于这个超时它会成功”的限制。如果 HTTP 超时不是真正的绑定限制并且存在其他限制系统的因素,我可以调整我的 HTTP 控制器以使用三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。 POST2 的意思是告诉我它是否完成。 POST3 表示给我出站对象。
对于不需要数据库且无需 IO 到此应用程序之外的任何内容的应用程序中的 Azure App Service 和 .NET Core 3.1 中的长时间运行计算,什么是合适的解决方案?这是一个计算任务。
【问题讨论】:
没有 I/O,只有数据流进出,需要高可扩展性......闻起来像数据块流 您可以运行多个“工作”吗?那是在一个端点上,这意味着有东西可能会击中它,然后在它运行时,其他东西可能会击中它。您想发回“忙碌”消息吗?或者你会接受吗?另外,您是否正在考虑横向扩展您的应用程序?如果您使用BackgroundService
,那么如果您的应用扩展到 2 个实例,您将有 2 个服务同时运行。
【参考方案1】:
序幕
几年前遇到了一个非常相似的问题。我们需要一种可以处理大量数据的服务。有时处理需要 10 秒,有时可能需要一个小时。
最初我们是按照您的问题说明的方式进行的:向服务发送请求,服务处理来自请求的数据并在完成后返回响应。
手头的问题
当工作只需要大约一分钟或更短的时间时这很好,但超过此时间,服务器将关闭会话并且调用者会报告错误。
服务器在放弃请求之前有大约 2 分钟的默认时间来产生响应。它不会退出对请求的处理......但它会退出 HTTP 会话。您在HttpClient
上设置的参数无关紧要,服务器是代表太 多长的服务器。
问题原因
这一切都是有充分理由的。服务器套接字非常昂贵。你有一个有限的数额。服务器试图通过切断花费超过指定时间的请求来保护您的服务,以避免出现套接字饥饿问题。
通常您希望您的 HTTP 请求只需几毫秒。如果它们花费的时间比这更长,如果您的服务必须以高速率满足其他请求,您最终会遇到套接字问题。
解决方案
我们决定走IHostedService
的路线,特别是BackgroundService
。我们将此服务与队列结合使用。通过这种方式,您可以设置一个作业队列,BackgroundService
将一次处理一个(在某些情况下,我们有服务一次处理多个队列项目,在其他情况下,我们水平扩展生成两个或更多队列)。
为什么 ASP.NET Core 服务运行 BackgroundService
?我想在不与任何特定于 Azure 的结构紧密耦合的情况下处理这个问题,以防我们需要从 Azure 迁移到其他云服务(当时我们出于其他原因考虑这样做。)
这对我们来说效果很好,从那以后我们没有发现任何问题。
工作流程是这样的:
-
调用者使用一些参数向服务发送请求
服务生成一个“作业”对象,并立即通过 202(已接受)响应返回一个 ID
服务将此作业放入由
BackgroundService
维护的队列中
调用者可以使用此作业 ID 查询作业状态并获取有关已完成的工作量和剩余工作量的信息
服务完成作业,将作业置于“已完成”状态,然后返回等待队列以生成更多作业
请记住,您的服务能够在运行多个实例的情况下水平扩展。在这种情况下,我使用 Redis Cache 来存储作业的状态,以便所有实例共享相同的状态。
如果您没有可用的 Redis 缓存,我还添加了“内存缓存”选项以在本地进行测试。你可以在服务器上运行“内存缓存”服务,只要知道如果它可以扩展,那么你的数据就会不一致。
示例
由于我已婚并有孩子,周五晚上大家都上床睡觉后我真的不怎么做,所以我花了一些时间整理了一个你可以尝试的例子。完整的solution 也可供您试用。
QueuedBackgroundService.cs
这个类实现有两个特定目的:一个是从队列中读取(BackgroundService
实现),另一个是写入队列(IQueuedBackgroundService
实现)。
public interface IQueuedBackgroundService
Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
public sealed class QueuedBackgroundService : BackgroundService, IQueuedBackgroundService
private sealed class JobQueueItem
public string JobId get; set;
public JobParametersModel JobParameters get; set;
private readonly IComputationWorkService _workService;
private readonly IComputationJobStatusService _jobStatusService;
// Shared between BackgroundService and IQueuedBackgroundService.
// The queueing mechanism could be moved out to a singleton service. I am doing
// it this way for simplicity's sake.
private static readonly ConcurrentQueue<JobQueueItem> _queue =
new ConcurrentQueue<JobQueueItem>();
private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
public QueuedBackgroundService(IComputationWorkService workService,
IComputationJobStatusService jobStatusService)
_workService = workService;
_jobStatusService = jobStatusService;
/// <summary>
/// Transient method via IQueuedBackgroundService
/// </summary>
public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
_queue.Enqueue(new JobQueueItem JobId = jobId, JobParameters = jobParameters );
_signal.Release(); // signal for background service to start working on the job
return new JobCreatedModel JobId = jobId, QueuePosition = _queue.Count ;
/// <summary>
/// Long running task via BackgroundService
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
while(!stoppingToken.IsCancellationRequested)
JobQueueItem jobQueueItem = null;
try
// wait for the queue to signal there is something that needs to be done
await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);
// dequeue the item
jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;
if(jobQueueItem != null)
// put the job in to a "processing" state
await _jobStatusService.UpdateJobStatusAsync(
jobQueueItem.JobId, JobStatus.Processing).ConfigureAwait(false);
// the heavy lifting is done here...
var result = await _workService.DoWorkAsync(
jobQueueItem.JobId, jobQueueItem.JobParameters,
stoppingToken).ConfigureAwait(false);
// store the result of the work and set the status to "finished"
await _jobStatusService.StoreJobResultAsync(
jobQueueItem.JobId, result, JobStatus.Success).ConfigureAwait(false);
catch(TaskCanceledException)
break;
catch(Exception ex)
try
// something went wrong. Put the job in to an errored state and continue on
await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId, new JobResultModel
Exception = new JobExceptionModel(ex)
, JobStatus.Errored).ConfigureAwait(false);
catch(Exception)
// TODO: log this
它是这样注入的:
services.AddHostedService<QueuedBackgroundService>();
services.AddTransient<IQueuedBackgroundService, QueuedBackgroundService>();
ComputationController.cs
用于读取/写入作业的控制器如下所示:
[ApiController, Route("api/[controller]")]
public class ComputationController : ControllerBase
private readonly IQueuedBackgroundService _queuedBackgroundService;
private readonly IComputationJobStatusService _computationJobStatusService;
public ComputationController(
IQueuedBackgroundService queuedBackgroundService,
IComputationJobStatusService computationJobStatusService)
_queuedBackgroundService = queuedBackgroundService;
_computationJobStatusService = computationJobStatusService;
[HttpPost, Route("beginComputation")]
[ProducesResponseType(StatusCodes.Status202Accepted, Type = typeof(JobCreatedModel))]
public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
return Accepted(
await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
[HttpGet, Route("computationStatus/jobId")]
[ProducesResponseType(StatusCodes.Status200OK, Type = typeof(JobModel))]
[ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(string))]
public async Task<IActionResult> GetComputationResultAsync(string jobId)
var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
if(job != null)
return Ok(job);
return NotFound($"Job with ID `jobId` not found");
[HttpGet, Route("getAllJobs")]
[ProducesResponseType(StatusCodes.Status200OK,
Type = typeof(IReadOnlyDictionary<string, JobModel>))]
public async Task<IActionResult> GetAllJobsAsync()
return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
[HttpDelete, Route("clearAllJobs")]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status401Unauthorized)]
public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
if(permission == "this is flakey security so this can be run as a public demo")
await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
return Ok();
return Unauthorized();
工作示例
只要这个问题是活跃的,我会maintain a working example你可以试试。对于此特定示例,您可以指定要运行的迭代次数。为了模拟长时间运行的工作,每次迭代为 1 秒。因此,如果您将迭代值设置为 60,它将运行该作业 60 秒。
在它运行时,运行computationStatus/jobId
或getAllJobs
端点。您可以实时查看所有作业更新。
这个例子远不是一个功能齐全的覆盖所有边缘案例的完全成熟的可用于生产的例子,但它是一个好的开始。
结论
在后端工作了几年后,我看到了很多由于不了解后端的所有“规则”而出现的问题。希望这个答案能对我过去遇到的问题有所启发,并希望这可以使您不必处理上述问题。
【讨论】:
谢谢。这至少可以说是慷慨的。我会消化这个。我目前的专业知识意味着我会比你想象的更慢地消化这个。 回答您评论中的问题...将有多个同时用户(人类)。每个用户的任务都独立于其他用户。我相信如果 Azure 可以处理 1 个用户,它可以处理很多。我承认您是说您的设计使用 Redis 缓存而不是内存缓存来处理同时用户。 @H2ONaCl -- 好吧,它使用应用的单个实例处理多个用户。它可以很好地处理多个用户。如果您的应用程序扩展到有 2 个应用程序运行的地方,那么您会想要使用 Redis 之类的东西,以便将数据存储在一个中心位置。如果您使用 Memory 方法,您可以拥有 1000 个用户……只需将其保留为 1 个实例。 在我的 POST1、POST2、POST3 示例中,如果 POST2 频繁轮询状态,它会立即发现任务已完成,所以我可以避免使用 Redis 缓存或完全避免任何状态信息。 也许我很困惑。我将不得不调查这个 Redis/Memory 的事情。【参考方案2】:一种选择可能是尝试Azure Durable Functions,它更面向需要检查点和状态的长时间运行的作业,而不是尝试在触发请求的上下文中完成。它还具有扇出/扇入的概念,以防您所描述的内容可以划分为具有汇总结果的较小作业。
如果目标只是原始计算,Azure Batch 可能是更好的选择,因为它有助于扩展。
【讨论】:
【参考方案3】:我认为需要完成的实际工作不是迭代一个不做任何事情的循环,所以就可能的并行化而言,我现在无法提供太多帮助。工作是 CPU 密集型还是 IO 相关?
当涉及到在 Azure 应用服务中长期运行的工作时,一种选择是使用 Web Job。一种可能的解决方案是将计算请求发布到队列(Storage Queue 或Azure Message Bus Queues)。然后 Web 作业处理这些消息,并可能将新消息放在请求者可以用来处理结果的另一个队列中。
如果保证处理所需的时间少于 10 分钟,您可以将 Web 作业替换为 Queue Triggered Azure Function。它是 Azure 上的无服务器产品,具有很大的扩展可能性。
另一个选择确实是使用Service Worker 或IHostingService 的实例并在那里进行一些队列处理。
【讨论】:
入站对象有一个需要处理的列表。如果列表有 10,000 个元素,则将 1,000 个元素发送到核心会更快,因此经过时间减少到原始经过时间的 10%。 10 个核心(线程)完成后,结果由原始生成线程组合。 在数据库访问或互联网调用的意义上没有 IO。如图所示,唯一的 IO 是入站对象和出站对象。父线程将处理完整的入站和完整的出站对象。子线程将接收和响应更小的对象。 (就像示例中的列表有 10,000 个元素,每个线程可以接收 1,000 个元素。) 如果我能获得 4 个或更多核心,10 分钟是可以接受的。如果我无法获得核心,而只有一个线程可以提供 50% 的核心周期,那么这意味着如果我有 8 个线程,10 分钟是可以接受的。我想你可以看到我所暗示的模式。 10 分钟是可以接受的,如果它是一个真正的限制。也就是说,如果由于 Azure 中无法解释的差异原因而随机 9 分钟,那将是可怕的。换句话说,用户等待了 9 分钟并收到“错误,再试一次,它可能会工作”。那可不好。 @H2ONaCl 使用 azure 函数消耗计划你不知道你得到了多少核心。使用 azure web 作业、服务工作者或 IHostingService 实例,您可以获得托管计划可用的核心。【参考方案4】:既然你说你的计算在更少的迭代中成功,一个简单的解决方案就是定期保存你的结果并恢复计算。
例如,假设您需要执行 2400 亿次迭代,并且您知道要可靠执行的最高迭代次数是 30 亿次迭代,我会设置以下内容:
-
一个从站,实际执行任务(2400 亿次迭代)
定期从从属接收有关进度的输入的主控。
slave 可以定期向 master 发送消息(比如每 20 亿次迭代一次?)。如果计算中断,此消息可能包含与恢复计算相关的任何内容。
-
主设备应跟踪从设备。如果主设备确定从设备已经死亡/崩溃/无论如何,主设备应该简单地创建一个新的从设备,它应该从上次报告的位置恢复计算。
具体如何实现 master 和 slave 是你个人喜好的问题。
与其让单个循环执行 2400 亿次迭代,如果您可以跨节点拆分计算,我会尝试在尽可能多的节点上同时并行计算解决方案。
我个人将 node.js 用于多核项目。尽管您使用的是 asp.net,但我还是包含了这个 node.js 示例来说明适合我的架构。
Node.js on multi-core machines
https://dzone.com/articles/multicore-programming-in-nodejs
正如 Noah Stahl 在他的回答中提到的,Azure Durable Functions 和 Azure Batch 似乎可以帮助您在平台上实现目标。详情请看他的回答。
【讨论】:
【参考方案5】:标准答案是使用异步消息传递。我有一个blog series on the topic。由于您已经在 Azure 中,因此尤其如此。
您已经拥有一个 Azure Web 应用服务,但现在您想在请求的外部运行代码 - “请求外部代码”。运行该代码的正确方法是在单独的进程中 - Azure Functions 或 Azure WebJobs 是 good match for Azure webapps。
首先,您需要一个持久队列。 Azure 存储队列非常适合,因为无论如何你都在 Azure 中。然后您的 webapi 可以将消息写入队列并返回。这里重要的部分是这是一个durable queue, not an in-memory queue。
与此同时,Azure 函数/WebJob 为 processing that queue。它将从队列中取出工作并执行。
拼图的最后一块是completion notification。这是一种很常见的方法:
我可以将我的 HTTP 控制器调整为使用三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。 POST2 的意思是告诉我它是否完成。 POST3 表示给我出站对象。
为此,您的后台处理器应将“进行中”/“完成/结果”状态保存在 webapi 进程可以访问的位置。如果您已经有一个共享数据库(并且保留结果是有意义的),那么这可能是最简单的选择。我还会考虑使用 Azure Cosmos DB,它有一个很好的生存时间设置,因此后台服务可以注入“24 小时内有效”或其他什么的结果,之后它们会被自动清理。
【讨论】:
以上是关于Azure App Service 和 .NET Core 3.1 中长时间运行计算的适当解决方案?的主要内容,如果未能解决你的问题,请参考以下文章
Azure App Service .net6 部署 - 错误:EISDIR:对目录进行非法操作,打开“/home/site/wwwroot/wwwroot/Identity/lib/bootstra
使用Azure App Service Logging时是否需要blob StorageExceptions?
Azure App Service Application Insights 不显示依赖的 sql 命令文本
Azure 部署后 ASP.NET Core MVC webapp 错误