需要一种内置方式来为现有存储库添加死锁弹性到 Dapper 而不更改它们
Posted
技术标签:
【中文标题】需要一种内置方式来为现有存储库添加死锁弹性到 Dapper 而不更改它们【英文标题】:Need a built in way to add Deadlock Resilience to Dapper for existing repos without altering them 【发布时间】:2021-12-31 20:12:21 【问题描述】:需要使所有现有存储库(大约 30+)对死锁具有容错能力,并使用日志和等待方法从死锁中恢复。
尝试成功:经过一些研究,我在下面使用 Polly 回答了自定义 SqlResiliencyPolicy,并针对项目进行了定制。
但是,我寻求的是:目前的方式(PFB 回答),要求我要么
-
用
await _policy.ExecuteAsync
OR 包装所有现有的数据库调用
提供接受IAsyncPolicy
参数的自定义重载。然后调用预期的方法。 IDbConnection 的扩展类型:
public static Task<T> GetAsync<T>(this IDbConnection connection, object primaryKey, IAsyncPolicy policy) =>
return await _policy.ExecuteAsync(async () => GetAsync<T> (...));
在这两种方式中,我都需要更改我所有的 30 多个存储库。但是,dapper/some-other-approaches 中是否有内置方法,我们可以在其中
"在启动时配置一个策略,并通过以下方式自动神奇地所有数据库调用 dapper 变得有弹性(回退到它们的容错机制) 类似于添加策略的 http 客户端弹性的方式 在您注册客户时”
这样:将代码更改降至最低,无需触及 repos,只需启动。
我有以下方法,需要对其进行改进。
【问题讨论】:
您可以创建 [IDbConnection 上的扩展方法] (davemateer.com/2021/08/29/dapper-and-polly) 以添加重试功能。 是的,这就是第 2 点^^,那里有一点代码 sn-p。但是我需要更多的魔法,我不需要 DI/甚至添加参数,这样的独角兽(如果可能的话)存在吗? 我不知道 Dapper 的任何全局错误处理程序,您可以在其中注入重试逻辑。在 MSSQL 的情况下,它会抛出一个 SqlException ,您可以对其做出反应,仅此而已。我不是 Dapper 专家,所以我可能会说无效的陈述。 【参考方案1】:已实施第二种方法 ^^:这将要 DI 的策略与现有存储库分离。 IDbConnection
的扩展方法负责围绕现有方法包装策略。
public class SqlResiliencePolicyFactory
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] 1205 );
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencePolicyFactory(ILogger logger, IConfiguration configuration)
_logger = logger;
_configuration = configuration;
public IPolicyRegistry<string> GetSqlResiliencePolicies(int transientErrorRetries = 3)
return new PolicyRegistry
"DbDeadLockResilience",
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
,
"DbDeadLockResilienceAsync",
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
;
private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.Log(
LogLevel.Warning,
exception,
@$"Transient DB Failure while executing query,
error number: ((SqlException)exception).Number;
reattempt number: reattemptCount");
在启动中:
DapperExtensions.SetPolicies(new SqlResiliencePolicyFactory(_logger, _configuration)
.GetSqlResiliencePolicies());
在单独的类中创建扩展方法,以将策略包装在您的 repo 的现有方法周围。 扩展方法:
public static class DapperExtensions
private static Policy _policy = Policy.NoOp();
private static IAsyncPolicy _asyncPolicy = Policy.NoOpAsync();
public static void SetPolicies(IReadOnlyPolicyRegistry<string> readOnlyPolicyRegistry)
_policy = readOnlyPolicyRegistry.Get<Policy>("DbDeadLockResilience");
_asyncPolicy = readOnlyPolicyRegistry.Get<IAsyncPolicy>("DbDeadLockResilienceAsync");
public static T GetFirstWithRetry<T>(this IDbConnection connection,
string? sql = null, object? parameters = null, IDbTransaction? transaction = null) where T : class =>
_policy.Execute(() => connection.GetFirst<T>(sql, parameters, transaction));
public static T QueryFirstOrDefaultWithRetry<T>(this IDbConnection connection, string sql,
object? parameters = null, IDbTransaction? transaction = null) =>
_policy.Execute(() => connection.QueryFirstOrDefault<T>(sql, parameters, transaction));
public static async Task<bool> UpdateAsyncWithRetry<T>(this IDbConnection connection, T entityToUpdate, IEnumerable<string> columnsToUpdate,
IDbTransaction? transaction = null) where T : class =>
await _asyncPolicy.ExecuteAsync(async () => await connection.UpdateAsync(entityToUpdate, columnsToUpdate, transaction));
//Similarly, add overloads to all the other methods in existing repo.
现在,
-
现有的回购独立于政策(回购没有 DI)。
策略保存在 SRP 之后的单独位置。
Dapper 扩展可以更改策略以便于测试。
因此,现有的 repos 必须更改名称并调用上述包装器,而不是调用 dapper 方法本身,将应用策略。不要忘记对 repo 进行一次回归测试。
【讨论】:
看起来还可以。我可能会更改SetPolicies
方法以预测 IReadOnlyPolicyRegistry
实例并自行检索所需的策略。
我不明白为什么检索方法是同步的,而更新是异步的。我最好的猜测是它们只是样本。
是的,这些只是为了展示我们如何围绕您现有的小巧玲珑查询包装策略,而不是实际方法。我还没有在更好的方法上破土动工,所以在制品。顺便说一句,感谢您的编辑和更正。
@PeterCsala 更改了 SetPolicies,这是有道理的。【参考方案2】:
经过一些研究后我的方法:
public class SqlResiliencyPolicy
private readonly ISet<int> transientDbErrors = new HashSet<int>(new[] 1205 );
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencyPolicy(ILogger logger, IConfiguration configuration)
_logger = logger;
_configuration = configuration;
public IAsyncPolicy GetSqlResiliencyPolicy(int transientErrorRetries = 3)
return Policy
.Handle<SqlException>(ex => transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
(exception, sleepTime, reattempt, context) =>
_logger.Log(LogLevel.Error, exception, $"Transient DB Failure while executing query, error number: ((SqlException)exception).Number; reattempt number: reattempt");
);
在启动中:
services.AddScoped(_ => new SqlResiliencyPolicy(_logger, _configuration).GetSqlResiliencyPolicy());
Ctor DI:在具有私有 IAsyncPolicy 支持字段的 Ctor 的现有 Repos DI 中:
private readonly IAsyncPolicy _policy;
最后一步:用
包装所有简洁的调用await _policy.ExecuteAsync(async () => <existing DB call>);
【讨论】:
我有一个小建议。使用PolicyRegister
将策略注册到您的 DI 中可能是有意义的。
@PeterCsala 谢谢!已经使用 PolicyRegister 和第 2 点方法实现了它[将很快分享代码]。【参考方案3】:
以下将是迄今为止对现有存储库进行最小/不更改的恰当方法。感谢@Sergey Akopov 撰写的博客以及指向此博客的同事。
简答:使用装饰器模式包装SQL Client's Connection and Command instances
并将Polly 的重试策略注入这些装饰器。这样,将能够使用重试策略包装所有 SQL 执行端点。这将与 Dapper 兼容,因为它是 IDbConnection
的扩展。
创建一个 DI'able 重试策略,将策略封装在其中。此外,我们可以完全将策略解耦以分离类并将其注册为 DI(此答案中未显示,但其他答案中也如此,如果您有多个策略,请不要忘记使用 PolicyRegister
)。
Git 仓库:https://github.com/VinZCodz/SqlTransientFaultHandling
详情:
策略接口,没有异步方法,因为Microsoft.Data.SqlClient
端点不是异步的。
public interface IRetryPolicy
void Execute(Action operation);
TResult Execute<TResult>(Func<TResult> operation);
具体实现,将策略嵌入其中,并通过 Sql Client 为所有 DB 调用包装重试逻辑,从而实现 Dapper。
public class RetryPolicy : IRetryPolicy
private readonly ILogger<RetryPolicy> _logger;
private readonly Policy _retryPolicy;
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] 1205 );
private const int _transientErrorRetries = 3;
public RetryPolicy(ILogger<RetryPolicy> logger)
_logger = logger;
_retryPolicy = Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: _transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction);
public void Execute(Action operation) => _retryPolicy.Execute(operation.Invoke);
public TResult Execute<TResult>(Func<TResult> operation) => _retryPolicy.Execute(() => operation.Invoke());
private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.LogWarning(
exception,
$"Transient DB Failure while executing query, error number: ((SqlException)exception).Number; reattempt number: reattemptCount");
现在,我们需要以某种方式将此策略注入 SqlClient 的 con 和 cmd,需要一个 sealed
类 'is-a' DbConnection
(DAL 端点将保持不变)以及 'has-a' DbConnection
(模仿操作但重试):
public sealed class ReliableSqlDbConnection : DbConnection
private readonly SqlConnection _underlyingConnection;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
private string _connectionString;
public ReliableSqlDbConnection(string connectionString, IRetryPolicy retryPolicy)
_connectionString = connectionString;
_retryPolicy = retryPolicy;
_underlyingConnection = new SqlConnection(connectionString);
public override string ConnectionString
get => _connectionString;
set => _underlyingConnection.ConnectionString = _connectionString = value;
public override void Open()
_retryPolicy.Execute(() =>
if (_underlyingConnection.State != ConnectionState.Open)
_underlyingConnection.Open();
);
public override string Database => _underlyingConnection.Database;
public override string DataSource => _underlyingConnection.DataSource;
public override string ServerVersion => _underlyingConnection.ServerVersion;
public override ConnectionState State => _underlyingConnection.State;
public override void ChangeDatabase(string databaseName) => _underlyingConnection.ChangeDatabase(databaseName);
public override void Close() => _underlyingConnection.Close();
protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => _underlyingConnection.BeginTransaction(isolationLevel);
protected override DbCommand CreateDbCommand() => new ReliableSqlDbCommand(_underlyingConnection.CreateCommand(), _retryPolicy);
因为,我们在需要时实例化 SqlConnection,我们还需要按照 Microsoft 建议的 derived type dispose pattern
正确处理它:
protected override void Dispose(bool disposing)
if (!_disposedValue)
if (disposing)
if (_underlyingConnection.State == ConnectionState.Open)
_underlyingConnection.Close();
_underlyingConnection.Dispose();
_disposedValue = true;
base.Dispose(disposing);
采用与DbCommand
类似的方法:
public sealed class ReliableSqlDbCommand : DbCommand
private readonly SqlCommand _underlyingSqlCommand;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
public ReliableSqlDbCommand(SqlCommand command, IRetryPolicy retryPolicy)
_underlyingSqlCommand = command;
_retryPolicy = retryPolicy;
public override string CommandText
get => _underlyingSqlCommand.CommandText;
set => _underlyingSqlCommand.CommandText = value;
public override int CommandTimeout
get => _underlyingSqlCommand.CommandTimeout;
set => _underlyingSqlCommand.CommandTimeout = value;
public override CommandType CommandType
get => _underlyingSqlCommand.CommandType;
set => _underlyingSqlCommand.CommandType = value;
public override bool DesignTimeVisible
get => _underlyingSqlCommand.DesignTimeVisible;
set => _underlyingSqlCommand.DesignTimeVisible = value;
public override UpdateRowSource UpdatedRowSource
get => _underlyingSqlCommand.UpdatedRowSource;
set => _underlyingSqlCommand.UpdatedRowSource = value;
protected override DbConnection DbConnection
get => _underlyingSqlCommand.Connection;
set => _underlyingSqlCommand.Connection = (SqlConnection)value;
protected override DbParameterCollection DbParameterCollection => _underlyingSqlCommand.Parameters;
protected override DbTransaction DbTransaction
get => _underlyingSqlCommand.Transaction;
set => _underlyingSqlCommand.Transaction = (SqlTransaction)value;
public override void Cancel() => _underlyingSqlCommand.Cancel();
public override int ExecuteNonQuery() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteNonQuery());
public override object ExecuteScalar() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteScalar());
public override void Prepare() => _retryPolicy.Execute(() => _underlyingSqlCommand.Prepare());
protected override DbParameter CreateDbParameter() => _underlyingSqlCommand.CreateParameter();
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteReader(behavior));
protected override void Dispose(bool disposing)
if (!_disposedValue)
if (disposing)
_underlyingSqlCommand.Dispose();
_disposedValue = true;
base.Dispose(disposing);
现有 DAL 端:
DI:
services.AddScoped<IRetryPolicy, RetryPolicy>();
services.Configure<DbConnectionOption>(options =>
options.ConnectionString = connectionString;
);
延迟加载装饰器:
_connection = new Lazy<IDbConnection>(() =>
return new ReliableSqlDbConnection(_dbOptions.ConnectionString, _retryPolicy);
);
Xunit 测试:这个测试实际上在单个会话上创建了一个死锁并重新绑定它。
感谢@Martin Smith 的精彩脚本,更多关于脚本的内容:Simulate a dead lock on SQL server using single client and single session
[Fact]
public void It_creates_reliablesqldbConnection_and_deadlock_itself_to_log_and_retry()
var logger = new FakeLogger<RetryPolicy>(); //create your own logger.
using var reliableSqlDbConnection = new ReliableSqlDbConnection(_fixture.Configuration["ConnectionStrings:DataContext"],
new RetryPolicy(logger)); //create your own fixture.
//Awesome script which deadlocks itself on single con and process with it's meta data.
Assert.ThrowsAsync<SqlException>(() => reliableSqlDbConnection.ExecuteAsync(
@"BEGIN TRAN
CREATE TYPE dbo.OptionIDs AS TABLE( OptionID INT PRIMARY KEY )
EXEC ('DECLARE @OptionIDs dbo.OptionIDs;')
ROLLBACK "));
Assert.Equal(LogLevel.Warning, logger.Logs.Select(g => g.Key).First());
var retries = logger.Logs[LogLevel.Warning].First();
Assert.Equal(3, retries.Count());
Assert.Equal("Transient DB Failure while executing query, error number: 1205; reattempt number: 1", retries.First());
总结:
这样,Open
Connection、ExecuteReader
、ExecuteScalar
、ExecuteNonQuery
等将具有重试功能,最终将由所有 Dapper 端点调用。
这样,将代码更改降至最低,无需触摸 repos,只需启动。只需为 SqlClient 的连接和命令提供包装器/装饰器,就可以使用自定义策略进行注入和重试。
【讨论】:
以上是关于需要一种内置方式来为现有存储库添加死锁弹性到 Dapper 而不更改它们的主要内容,如果未能解决你的问题,请参考以下文章