需要一种内置方式来为现有存储库添加死锁弹性到 Dapper 而不更改它们

Posted

技术标签:

【中文标题】需要一种内置方式来为现有存储库添加死锁弹性到 Dapper 而不更改它们【英文标题】:Need a built in way to add Deadlock Resilience to Dapper for existing repos without altering them 【发布时间】:2021-12-31 20:12:21 【问题描述】:

需要使所有现有存储库(大约 30+)对死锁具有容错能力,并使用日志和等待方法从死锁中恢复。

尝试成功:经过一些研究,我在下面使用 Polly 回答了自定义 SqlResiliencyPolicy,并针对项目进行了定制。

但是,我寻求的是:目前的方式(PFB 回答),要求我要么

    await _policy.ExecuteAsync OR 包装所有现有的数据库调用 提供接受IAsyncPolicy 参数的自定义重载。然后调用预期的方法。 IDbConnection 的扩展类型:

public static Task<T> GetAsync<T>(this IDbConnection connection, object primaryKey, IAsyncPolicy policy) =>return await _policy.ExecuteAsync(async () => GetAsync<T> (...));

在这两种方式中,我都需要更改我所有的 30 多个存储库。但是,dapper/some-other-approaches 中是否有内置方法,我们可以在其中

"在启动时配置一个策略,并通过以下方式自动神奇地所有数据库调用 dapper 变得有弹性(回退到它们的容错机制) 类似于添加策略的 http 客户端弹性的方式 在您注册客户时”

这样:将代码更改降至最低,无需触及 repos,只需启动。

我有以下方法,需要对其进行改进。

【问题讨论】:

您可以创建 [IDbConnection 上的扩展方法] (davemateer.com/2021/08/29/dapper-and-polly) 以添加重试功能。 是的,这就是第 2 点^^,那里有一点代码 sn-p。但是我需要更多的魔法,我不需要 DI/甚至添加参数,这样的独角兽(如果可能的话)存在吗? 我不知道 Dapper 的任何全局错误处理程序,您可以在其中注入重试逻辑。在 MSSQL 的情况下,它会抛出一个 SqlException ,您可以对其做出反应,仅此而已。我不是 Dapper 专家,所以我可能会说无效的陈述。 【参考方案1】:

已实施第二种方法 ^^:这将要 DI 的策略与现有存储库分离。 IDbConnection 的扩展方法负责围绕现有方法包装策略。

public class SqlResiliencePolicyFactory

    private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[]  1205 );
    private readonly ILogger _logger;
    private readonly IConfiguration _configuration;

    public SqlResiliencePolicyFactory(ILogger logger, IConfiguration configuration)
    
        _logger = logger;
        _configuration = configuration;
    

    public IPolicyRegistry<string> GetSqlResiliencePolicies(int transientErrorRetries = 3)
    
        return new PolicyRegistry
        
             
                "DbDeadLockResilience", 
                Policy
                    .Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
                    .WaitAndRetry(
                        retryCount: transientErrorRetries,
                        sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                        onRetry: LogRetryAction)
            ,
             
                "DbDeadLockResilienceAsync", 
                Policy
                    .Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
                    .WaitAndRetryAsync(
                        retryCount: transientErrorRetries,
                        sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                        onRetry: LogRetryAction)
            
        ;
    
    
    private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
        _logger.Log(
            LogLevel.Warning,
            exception,
            @$"Transient DB Failure while executing query,
                error number: ((SqlException)exception).Number;
                reattempt number: reattemptCount");

在启动中:

DapperExtensions.SetPolicies(new SqlResiliencePolicyFactory(_logger, _configuration)
                            .GetSqlResiliencePolicies());

在单独的类中创建扩展方法,以将策略包装在您的 repo 的现有方法周围。 扩展方法:

public static class DapperExtensions

    private static Policy _policy = Policy.NoOp();
    private static IAsyncPolicy _asyncPolicy = Policy.NoOpAsync();

    public static void SetPolicies(IReadOnlyPolicyRegistry<string> readOnlyPolicyRegistry)
        
            _policy = readOnlyPolicyRegistry.Get<Policy>("DbDeadLockResilience");
            _asyncPolicy = readOnlyPolicyRegistry.Get<IAsyncPolicy>("DbDeadLockResilienceAsync");
        

    public static T GetFirstWithRetry<T>(this IDbConnection connection,
                                        string? sql = null, object? parameters = null, IDbTransaction? transaction = null) where T : class =>
        _policy.Execute(() => connection.GetFirst<T>(sql, parameters, transaction));

    public static T QueryFirstOrDefaultWithRetry<T>(this IDbConnection connection, string sql,
                                          object? parameters = null, IDbTransaction? transaction = null) =>
        _policy.Execute(() => connection.QueryFirstOrDefault<T>(sql, parameters, transaction));

    public static async Task<bool> UpdateAsyncWithRetry<T>(this IDbConnection connection, T entityToUpdate, IEnumerable<string> columnsToUpdate,
                                                      IDbTransaction? transaction = null) where T : class =>
        await _asyncPolicy.ExecuteAsync(async () => await connection.UpdateAsync(entityToUpdate, columnsToUpdate, transaction));

    //Similarly, add overloads to all the other methods in existing repo.

现在,

    现有的回购独立于政策(回购没有 DI)。 策略保存在 SRP 之后的单独位置。 Dapper 扩展可以更改策略以便于测试。

因此,现有的 repos 必须更改名称并调用上述包装器,而不是调用 dapper 方法本身,将应用策略。不要忘记对 repo 进行一次回归测试。

【讨论】:

看起来还可以。我可能会更改 SetPolicies 方法以预测 IReadOnlyPolicyRegistry 实例并自行检索所需的策略。 我不明白为什么检索方法是同步的,而更新是异步的。我最好的猜测是它们只是样本。 是的,这些只是为了展示我们如何围绕您现有的小巧玲珑查询包装策略,而不是实际方法。我还没有在更好的方法上破土动工,所以在制品。顺便说一句,感谢您的编辑和更正。 @PeterCsala 更改了 SetPolicies,这是有道理的。【参考方案2】:

经过一些研究后我的方法:

public class SqlResiliencyPolicy 
 
    private readonly ISet<int> transientDbErrors = new HashSet<int>(new[]  1205 );
    private readonly ILogger _logger;
    private readonly IConfiguration _configuration;

    public SqlResiliencyPolicy(ILogger logger, IConfiguration configuration)
    
        _logger = logger;
        _configuration = configuration;
    

    public IAsyncPolicy GetSqlResiliencyPolicy(int transientErrorRetries = 3)
    
        return Policy
            .Handle<SqlException>(ex => transientDbErrors.Contains(ex.Number))
            .WaitAndRetryAsync(
                retryCount: transientErrorRetries,
                sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                (exception, sleepTime, reattempt, context) =>
                
                    _logger.Log(LogLevel.Error, exception, $"Transient DB Failure while executing query, error number: ((SqlException)exception).Number; reattempt number: reattempt");
                );
    

在启动中:

 services.AddScoped(_ => new SqlResiliencyPolicy(_logger, _configuration).GetSqlResiliencyPolicy());

Ctor DI:在具有私有 IAsyncPolicy 支持字段的 Ctor 的现有 Repos DI 中:

private readonly IAsyncPolicy _policy;

最后一步:用

包装所有简洁的调用
await _policy.ExecuteAsync(async () => <existing DB call>);

【讨论】:

我有一个小建议。使用 PolicyRegister 将策略注册到您的 DI 中可能是有意义的。 @PeterCsala 谢谢!已经使用 PolicyRegister 和第 2 点方法实现了它[将很快分享代码]。【参考方案3】:

以下将是迄今为止对现有存储库进行最小/不更改的恰当方法。感谢@Sergey Akopov 撰写的博客以及指向此博客的同事。

简答:使用装饰器模式包装SQL Client's Connection and Command instances 并将Polly 的重试策略注入这些装饰器。这样,将能够使用重试策略包装所有 SQL 执行端点。这将与 Dapper 兼容,因为它是 IDbConnection 的扩展。

创建一个 DI'able 重试策略,将策略封装在其中。此外,我们可以完全将策略解耦以分离类并将其注册为 DI(此答案中未显示,但其他答案中也如此,如果您有多个策略,请不要忘记使用 PolicyRegister)。

Git 仓库:https://github.com/VinZCodz/SqlTransientFaultHandling

详情

策略接口,没有异步方法,因为Microsoft.Data.SqlClient 端点不是异步的。

public interface IRetryPolicy

 void Execute(Action operation);
 TResult Execute<TResult>(Func<TResult> operation);

具体实现,将策略嵌入其中,并通过 Sql Client 为所有 DB 调用包装重试逻辑,从而实现 Dapper。

public class RetryPolicy : IRetryPolicy

    private readonly ILogger<RetryPolicy> _logger;
    private readonly Policy _retryPolicy;

    private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[]  1205 );
    private const int _transientErrorRetries = 3;

    public RetryPolicy(ILogger<RetryPolicy> logger)
    
        _logger = logger;
        _retryPolicy = Policy
                        .Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
                        .WaitAndRetry(
                        retryCount: _transientErrorRetries,
                        sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                        onRetry: LogRetryAction);
    

    public void Execute(Action operation) => _retryPolicy.Execute(operation.Invoke);

    public TResult Execute<TResult>(Func<TResult> operation) => _retryPolicy.Execute(() => operation.Invoke());

    private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
        _logger.LogWarning(
            exception,
            $"Transient DB Failure while executing query, error number: ((SqlException)exception).Number; reattempt number: reattemptCount");

现在,我们需要以某种方式将此策略注入 SqlClient 的 con 和 cmd,需要一个 sealed'is-a' DbConnection(DAL 端点将保持不变)以及 'has-a' DbConnection(模仿操作但重试):

 public sealed class ReliableSqlDbConnection : DbConnection
 
    private readonly SqlConnection _underlyingConnection;
    private readonly IRetryPolicy _retryPolicy;

    private bool _disposedValue;
    private string _connectionString;

    public ReliableSqlDbConnection(string connectionString, IRetryPolicy retryPolicy)
    
        _connectionString = connectionString;
        _retryPolicy = retryPolicy;
        _underlyingConnection = new SqlConnection(connectionString);
    

    public override string ConnectionString
    
        get => _connectionString;
        set => _underlyingConnection.ConnectionString = _connectionString = value;
    
    public override void Open()
    
        _retryPolicy.Execute(() =>
        
            if (_underlyingConnection.State != ConnectionState.Open)
            
                _underlyingConnection.Open();
            
        );
    
    public override string Database => _underlyingConnection.Database;
    public override string DataSource => _underlyingConnection.DataSource;
    public override string ServerVersion => _underlyingConnection.ServerVersion;
    public override ConnectionState State => _underlyingConnection.State;
    public override void ChangeDatabase(string databaseName) => _underlyingConnection.ChangeDatabase(databaseName);
    public override void Close() => _underlyingConnection.Close();
    protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => _underlyingConnection.BeginTransaction(isolationLevel);
    protected override DbCommand CreateDbCommand() => new ReliableSqlDbCommand(_underlyingConnection.CreateCommand(), _retryPolicy);
 

因为,我们在需要时实例化 SqlConnection,我们还需要按照 Microsoft 建议的 derived type dispose pattern 正确处理它:

protected override void Dispose(bool disposing)

    if (!_disposedValue)
    
        if (disposing)
        
            if (_underlyingConnection.State == ConnectionState.Open)
            
                _underlyingConnection.Close();
            
            _underlyingConnection.Dispose();
        
        _disposedValue = true;
    
    base.Dispose(disposing);

采用与DbCommand类似的方法:

public sealed class ReliableSqlDbCommand : DbCommand

    private readonly SqlCommand _underlyingSqlCommand;
    private readonly IRetryPolicy _retryPolicy;

    private bool _disposedValue;

    public ReliableSqlDbCommand(SqlCommand command, IRetryPolicy retryPolicy)
    
        _underlyingSqlCommand = command;
        _retryPolicy = retryPolicy;
    

    public override string CommandText
    
        get => _underlyingSqlCommand.CommandText;
        set => _underlyingSqlCommand.CommandText = value;
    

    public override int CommandTimeout
    
        get => _underlyingSqlCommand.CommandTimeout;
        set => _underlyingSqlCommand.CommandTimeout = value;
    

    public override CommandType CommandType
    
        get => _underlyingSqlCommand.CommandType;
        set => _underlyingSqlCommand.CommandType = value;
    

    public override bool DesignTimeVisible
    
        get => _underlyingSqlCommand.DesignTimeVisible;
        set => _underlyingSqlCommand.DesignTimeVisible = value;
    

    public override UpdateRowSource UpdatedRowSource
    
        get => _underlyingSqlCommand.UpdatedRowSource;
        set => _underlyingSqlCommand.UpdatedRowSource = value;
    

    protected override DbConnection DbConnection
    
        get => _underlyingSqlCommand.Connection;
        set => _underlyingSqlCommand.Connection = (SqlConnection)value;
    

    protected override DbParameterCollection DbParameterCollection => _underlyingSqlCommand.Parameters;

    protected override DbTransaction DbTransaction
    
        get => _underlyingSqlCommand.Transaction;
        set => _underlyingSqlCommand.Transaction = (SqlTransaction)value;
    

    public override void Cancel() => _underlyingSqlCommand.Cancel();

    public override int ExecuteNonQuery() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteNonQuery());

    public override object ExecuteScalar() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteScalar());

    public override void Prepare() => _retryPolicy.Execute(() => _underlyingSqlCommand.Prepare());

    protected override DbParameter CreateDbParameter() => _underlyingSqlCommand.CreateParameter();

    protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteReader(behavior));

    protected override void Dispose(bool disposing)
    
        if (!_disposedValue)
        
            if (disposing)
            
                _underlyingSqlCommand.Dispose();
            
            _disposedValue = true;
        
        base.Dispose(disposing);
    

现有 DAL 端:

DI:

services.AddScoped<IRetryPolicy, RetryPolicy>();
services.Configure<DbConnectionOption>(options =>

    options.ConnectionString = connectionString;
);

延迟加载装饰器:

_connection = new Lazy<IDbConnection>(() =>

    return new ReliableSqlDbConnection(_dbOptions.ConnectionString, _retryPolicy);
);

Xunit 测试:这个测试实际上在单个会话上创建了一个死锁并重新绑定它。

感谢@Martin Smith 的精彩脚本,更多关于脚本的内容:Simulate a dead lock on SQL server using single client and single session

[Fact]
public void It_creates_reliablesqldbConnection_and_deadlock_itself_to_log_and_retry()

    var logger = new FakeLogger<RetryPolicy>(); //create your own logger.
    using var reliableSqlDbConnection = new ReliableSqlDbConnection(_fixture.Configuration["ConnectionStrings:DataContext"],
                                                                    new RetryPolicy(logger)); //create your own fixture.

    //Awesome script which deadlocks itself on single con and process with it's meta data.                                                              
    Assert.ThrowsAsync<SqlException>(() => reliableSqlDbConnection.ExecuteAsync(
                                            @"BEGIN TRAN
                                            CREATE TYPE dbo.OptionIDs AS TABLE( OptionID INT PRIMARY KEY )
                                            EXEC ('DECLARE @OptionIDs dbo.OptionIDs;')
                                            ROLLBACK "));
                                            
    Assert.Equal(LogLevel.Warning, logger.Logs.Select(g => g.Key).First());

    var retries = logger.Logs[LogLevel.Warning].First();

    Assert.Equal(3, retries.Count());
    Assert.Equal("Transient DB Failure while executing query, error number: 1205; reattempt number: 1", retries.First());

总结: 这样,Open Connection、ExecuteReaderExecuteScalarExecuteNonQuery 等将具有重试功能,最终将由所有 Dapper 端点调用。

这样,将代码更改降至最低,无需触摸 repos,只需启动。只需为 SqlClient 的连接和命令提供包装器/装饰器,就可以使用自定义策略进行注入和重试。

【讨论】:

以上是关于需要一种内置方式来为现有存储库添加死锁弹性到 Dapper 而不更改它们的主要内容,如果未能解决你的问题,请参考以下文章

将本地现有目录连接到现有 Git 远程存储库

向现有 Python 类添加函数的模式

更新AWS Amplify中现有应用程序的回购访问密钥

通过网络和本地公开 Python 库 API 的快速且有弹性的方式

将 WCF 服务添加到现有应用程序?

向现有文档弹性搜索添加其他属性