在与 Entity Framework 的事务中通过 MassTransit 发送消息

Posted

技术标签:

【中文标题】在与 Entity Framework 的事务中通过 MassTransit 发送消息【英文标题】:Sending message via MassTransit in a transaction with Entity Framework 【发布时间】:2022-01-01 01:30:10 【问题描述】:

背景: 我们使用Quartz.NET 来安排何时应该使用MassTransit 将消息发送到RabbitMQ 队列。所有消息也使用 Entity Framework 保存到数据库中,无论是出于审计原因还是为了恢复;在应用程序启动期间重新安排未发送的消息,因为 Quartz 将计划的作业(要发送的消息)保留在内存中。发送消息时,数据库会更新,在消息实体上设置一个标志,说明该消息已发送,以便在下次应用程序启动时不会重新发送。

问题: 应用程序可能会在数据库更新后、消息发送之前终止,从而导致消息永远不会发送。如果我们颠倒操作顺序并先发送消息,则应用程序可能会在数据库更新之前崩溃,从而在下次应用程序启动时重新发送消息。我们希望将这两个操作作为一个事务执行。

我们设法在消费者中使用事务,从ConsumeContext<T>.GetPayload<TransactionContext>() 获取TransactionContext 并将其传递给DbContext.EnlistTransaction()。使用ISendEndpointProvider 是否有任何类似的选项可用?

下面是 Quartz IJob 实现的(精简版)版本。我们必须为 EF 创建一个范围,因为 Quartz 作为单例运行。在解决上述问题时,我们有哪些选择(如果有)?

我们正在使用 MassTransit 7.2.3、Quartz 3.3.3 和 Entity Framework Core 5.0.11。

public sealed class ScheduleMessageJob : IJob

    private readonly IServiceProvider _serviceProvider;

    public ScheduleMessageJob(IServiceProvider serviceProvider) =>
        _serviceProvider = serviceProvider;

    public async Task Execute(IJobExecutionContext context)
    
        using var scope = _serviceProvider.CreateScope();
        var scopedServiceProvider = scope.ServiceProvider;
        await UpdateScheduledMessage(scopedServiceProvider);
        await Send(scopedServiceProvider);
    

    private static async Task UpdateScheduledMessage(IServiceProvider scopedServiceProvider)
    
        var dbContext = scopedServiceProvider.GetRequiredService<IDbContext>();
        var scheduledMessage = await dbContext.Get<ScheduledMessage>(id: 1);
        scheduledMessage.IsQueued = true;
        dbContext.Update(scheduledMessage);
        await dbContext.SaveChangesAsync();
    

    public static async Task Send(IServiceProvider scopedServiceProvider)
    
        var endpoint = await GetEndpoint(scopedServiceProvider);
        var triggerExecuted = new TriggerExecuted("Some data");
        await endpoint.Send(triggerExecuted);
    

    private static async Task<ISendEndpoint> GetEndpoint(IServiceProvider serviceProvider) =>
        await serviceProvider
        .GetRequiredService<ISendEndpointProvider>()
        .GetSendEndpoint(new Uri("queue:SomeQueue"));

【问题讨论】:

【参考方案1】:

大多数消息代理不是事务性的,也不参与事务。在上述情况下,预期的分布式事务一致性是一个错误。或者那至少是一个非常非常糟糕的主意。

您可以选择依赖消息代理(我的偏好)或数据库。您还可以使用审计功能(观察所有发送/发布/使用的消息并将它们写入数据库)或通过单独的消费者将消息写入数据库。该消息类型。

Quartz 具有触发器重新触发功能,因此您可以重试失败的触发器。我会首先使用持久的 Quartz 数据存储,然后写入代理,然后最后处理您的数据库。

【讨论】:

以上是关于在与 Entity Framework 的事务中通过 MassTransit 发送消息的主要内容,如果未能解决你的问题,请参考以下文章

在Entity Framework中使用事务

Entity Framework Core 数据保存原理详解

Entity Framework Core 数据保存原理详解

不能在 Entity Framework 6.1.3 和 PostgreSQL 中使用打开的事务

如何在 Entity Framework 6 DbContext.Database.BeginTransaction 中配置事务超时?

Entity Framework :Using Transaction Scope 如何检查 DbContext 是不是有事务?