后台工作人员中异步等待中的调解员死锁-如何检测线程调用自身
Posted
技术标签:
【中文标题】后台工作人员中异步等待中的调解员死锁-如何检测线程调用自身【英文标题】:Mediator deadlock on async await within background worker - how to detect thread calling itself 【发布时间】:2020-08-28 16:34:42 【问题描述】:我有一个中介,我最近需要在后台线程上一次同步一个消息调度,但它是锁定的,如下所示。
我将命令发布到队列并从 TaskCompletionSource 返回任务:
public Task<object> Send(object command, CancellationToken cancellationToken)
var item = new CommandItem() Command = request, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;
this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
return item.Tcs.Task;
然后从后台工作程序中获取它,并创建处理程序:
var item = await this.queue.Reader.ReadAsync(cancellationToken);
// work out command type snipped
var command = item.Command as LockMeGoodCommand;
var handler = new LockMeGoodCommandHandler();
var result = await handler.Handle(command, item.Ct);
item.Tcs.SetResult(result);
然后处理它,当命令处理程序被发送到命令处理程序内时,以下锁定(当使用后台线程时,但在线程内是可以的):
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
Console.WriteLine(command.GetType().Name);
// this would get the result but will lock forever when using background worker bus implementation
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
// perform some action based on the result - but we never get here
Console.WriteLine("otherResult is " + otherResult);
return 3;
** 问题和潜在的修复方法 **
我相信我们可以通过检测后台线程是否从其线程内向它自己发布(通过命令处理程序,然后调用 Send() 来发布新命令)来避免死锁,如果是,它不应使用任何线程机制(发布到命令队列或 TaskCompletionSource),而应直接处理任务。
我试图检测线程但它不工作,所以我在var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken, true)
上方的处理程序中将手动标志 isSameThread 设置为 true,我可以确认它工作正常并且避免了死锁 .
此修复中有任何警告吗?如何检测后台线程是否正在请求发送命令(线程如何检测自己)以及如何完成以下代码(从DispatchOnBackgroundThread.Send()
包含此自调用检测(这样我就可以取消isSameThread 标志)?
这似乎涉及更多,因为每个等待都会给出不同的线程 ID。
// in thread start we set the thread id of the background thread
this.workerThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;
public Task<object> Send(object command, CancellationToken cancellationToken, bool isSameThread = false)
Console.WriteLine($"this.workerThreadId: this.workerThreadId, Thread.CurrentThread.ManagedThreadId: Thread.CurrentThread.ManagedThreadId");
// below doesnt work gives different numbers so i use flag instead
// this.workerThreadId == Thread.CurrentThread.ManagedThreadId
if (isSameThread == true)
if (command is BoringCommand boringCommand)
var handler = new BoringCommandHandler();
return handler.Handle(boringCommand, cancellationToken).ContinueWith(t => (object)t);
else if (command is LockMeGoodCommand lockMeGoodCommand)
var handler = new LockMeGoodCommandHandler(this);
return handler.Handle(lockMeGoodCommand, cancellationToken).ContinueWith(t => (object)t);
else
throw new Exception("unknown");
else
var item = new CommandItem() Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;
this.queue.Writer.WriteAsync(item); // just write and immediatly return the cts
return item.Tcs.Task;
** 代码演示问题**
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace TestDeadlock
class BoringCommand
class LockMeGoodCommand
class BoringCommandHandler
public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
Console.WriteLine(command.GetType().Name);
return Task.FromResult(1);
class LockMeGoodCommandHandler
private readonly DispatchOnBackgroundThread commandBus;
public LockMeGoodCommandHandler(DispatchOnBackgroundThread commandBus) => this.commandBus = commandBus;
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
Console.WriteLine(command.GetType().Name);
// this locks forever
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
Console.WriteLine("otherResult is " + otherResult);
return 3;
public class DispatchOnBackgroundThread
private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
private Task worker = null;
class CommandItem
public object Command get; set;
public CancellationToken Ct get; set;
public TaskCompletionSource<object> Tcs get; set;
public Task<object> Send(object command, CancellationToken cancellationToken)
var item = new CommandItem()
Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;
this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
return item.Tcs.Task;
public void Start(CancellationToken cancellationToken)
this.worker = Task.Factory.StartNew(async () =>
try
while (cancellationToken.IsCancellationRequested == false)
var item = await this.queue.Reader.ReadAsync(cancellationToken);
// simplified DI container magic to static invocation
if (item.Command is BoringCommand boringCommand)
var handler = new BoringCommandHandler();
var result = await handler.Handle(boringCommand, item.Ct);
item.Tcs.SetResult(result);
if (item.Command is LockMeGoodCommand lockMeGoodCommand)
var handler = new LockMeGoodCommandHandler(this);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
catch (TaskCanceledException)
,
TaskCreationOptions.LongRunning)
.Unwrap();
public async Task StopAsync()
this.queue.Writer.Complete();
await this.worker;
class Program
static async Task Main(string[] args)
var cts = new CancellationTokenSource();
var threadStrategy = new DispatchOnBackgroundThread();
threadStrategy.Start(cts.Token);
var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
var result2 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
cts.Cancel();
await threadStrategy.StopAsync();
** 无需锁定即可工作的简单非线程中介实现 **
public class DispatchInCallingThread
public async Task<object> Send(object request, CancellationToken cancellationToken)
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
else if (request is LockMeGoodCommand lockMeGoodCommand)
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
else
throw new Exception("unknown");
【问题讨论】:
代码太多了。实际上可以肯定的是,您发布的代码中只有一小部分实际上涉及死锁。请修复。同时:坦率地说,您发现一些僵局场景尚未被 SO 上的许多现有问题充分解决的可能性微乎其微。你有一个线程应该在工作,另一个线程在等待它工作,每个线程都在等待对方。 ... ... 您真正需要的是中断调试器并查看每个线程被阻塞的位置。修复就像修复任何死锁一样:不要创建相互的锁依赖关系(如果可能,请避免使用多个锁,如果没有,请确保无论线程如何,您始终以相同的顺序获取所有锁)。 嗨,彼得,谢谢你的时间。如果有人愿意运行它,我已经包含了锁定会话的摘要和代码示例。我同意它很长(ish),但包含了非线程版本(有效)。我没有使用锁。在发布之前我已经详细查看了 SO,但我承认我被卡住了 “我没有使用锁。” -- 没有明确说明,但是线程同步中涉及到隐式锁来支持您使用await
和@987654330 @。例如,两个等待对方完成的方法都不会完成。不要在这里按字面意思理解“锁定”。在任何情况下,包含工作代码都是没有意义的,只会分散问题的注意力,甚至损坏的代码也可能不是最小重现问题。
@PeterDuniho 我已经清理了这个问题,以便更清楚地感谢指针/耐心 - 我相信通过检测处理程序中正在执行的新命令可以完全避免死锁,如果是这样直接处理而不是使用新的 TaskCompletionSource 等将命令发布到队列 - 我将在同一后台线程中处理重入/嵌套命令(就像单线程调解器一样)。我已经更新了要问的问题,假设这是正确的修复,如何检测我们是发布命令请求的后台线程。
【参考方案1】:
死锁的原因很简单:
有一个代码循环(不是特定线程;见下文)负责处理队列。在处理每个命令时,它会await
s 该命令的处理程序。
有一个命令处理程序,await
s 另一个命令要处理。但是,这不起作用,因为不会处理更多命令;在此命令完成之前,代码循环不会使下一个命令出队。
换句话说,如果一个命令一次只能执行一个命令,那么一个命令执行另一个命令在逻辑上是不可能的。
有几种可能的方法可以解决这个问题。我确实不推荐“重入”方法;重入是许多微妙的逻辑错误的原因。我推荐的方法之一是:
-
更改
Send
语义,使其成为“队列”语义。这意味着不可能得到命令结果;结果必须通过某些中介作为消息发送。
让代码循环不是await
命令处理程序,允许它循环返回并获取下一个命令。这意味着它不再“一次同步一个”。
将“一次同步一个”重新定义为“一次同步一个,但如果是await
ing,则不算作一个”。在这种情况下,您可能可以使用 ConcurrentExclusiveSchedulerPair
或 Nito.AsyncEx.AsyncContext
之类的东西一次运行一个方法块。
旁注:LongRunning
没有做你认为它正在做的事情。 StartNew
is not async
-aware,所以LongRunning
标志只适用于直到第一个await
的代码;之后,该 lambda 中的代码将在任意线程池线程上运行(没有LongRunning
设置)。将StartNew
替换为Task.Run
将使代码更清晰。
【讨论】:
Stephen 感谢您的回答和指点,在您的 cmets 之后,我意识到我的设计存在缺陷。使用依赖注入,我确保将调用线程调度程序(无队列直通)实现传递给任何处理程序(如果在构造函数中请求),以便如果处理程序确实发布,它将绕过队列/工作逻辑并简单地处理等待在工作线程的调用线程中内联。您的 cmets 对这次审查至关重要,并帮助避免了重新进入方法,结果证明流程非常简洁【参考方案2】:感谢 Stephen 的回答和 Peter 的 cmets,在说谢谢时确实非常清楚,
有一个代码循环(不是特定线程;见下文)是 负责处理队列。在处理每个命令时,它 等待该命令的处理程序。
有一个命令处理程序等待处理另一个命令。 但是,这行不通,因为不会有更多的命令 处理;在此之前,代码循环不会使下一个命令出队 一个完成。
考虑到上述情况,我找到了一种无需任何线程黑客(检测堆栈/重新进入深度等)或调度程序的处理方法。
在下面的示例中,我“注入”到处理程序中的不是循环调用类,而是一种不同类型的命令处理程序调度程序,它不进行任何排队,而是直接在线程内处理。
下面是从线程循环内部调用的,那么就没有相互依赖了:
public class DispatchInCallingThread: ICommandBus
public async Task<object> Send(object request, CancellationToken cancellationToken)
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
else if (request is LockMeGoodCommand lockMeGoodCommand)
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
else
throw new Exception("cough furball");
public void Start(CancellationToken cancellationToken)
public Task StopAsync() return Task.CompletedTask;
在后台线程中,这是对实例化命令处理程序的注入:
else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
现在代码将永远运行(需要为设置的取消令牌源实现适当的关闭逻辑):
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace TestDeadlock
class BoringCommand
class LockMeGoodCommand
class BoringCommandHandler
public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
Console.WriteLine(command.GetType().Name);
return Task.FromResult(1);
class LockMeGoodCommandHandler
private readonly ICommandBus commandBus;
public LockMeGoodCommandHandler(ICommandBus commandBus) => this.commandBus = commandBus;
public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
Console.WriteLine(command.GetType().Name);
var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
var otherResult2 = await this.commandBus.Send(new BoringCommand(), cancellationToken);
return 3;
public interface ICommandBus
Task<object> Send(object request, CancellationToken cancellationToken);
void Start(CancellationToken cancellationToken);
Task StopAsync();
public class DispatchOnBackgroundThread : ICommandBus
private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
private Task worker = null;
private readonly DispatchInCallingThread dispatchInCallingThread = new DispatchInCallingThread();
class CommandItem
public object Command get; set;
public CancellationToken Ct get; set;
public TaskCompletionSource<object> Tcs get; set;
public Task<object> Send(object command, CancellationToken cancellationToken)
var item = new CommandItem() Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;
this.queue.Writer.WriteAsync(item, cancellationToken); // just write and immediatly return the cts
return item.Tcs.Task;
public void Start(CancellationToken cancellationToken)
var scheduler = new ConcurrentExclusiveSchedulerPair();
this.worker = Task.Factory.StartNew(async () =>
CommandItem item = null;
try
while (cancellationToken.IsCancellationRequested == false)
item = await this.queue.Reader.ReadAsync(cancellationToken);
// simplified DI container magic to static invocation
if (item.Command is BoringCommand boringCommand)
var handler = new BoringCommandHandler();
var result = handler.Handle(boringCommand, item.Ct);
item.Tcs.SetResult(result);
else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
var result = await handler.Handle(lockMeGoodCommand, item.Ct);
item.Tcs.SetResult(result);
else
throw new Exception("unknown");
catch (TaskCanceledException)
if (item != null)
item.Tcs.SetCanceled();
Console.WriteLine("exit background thread");
)
.Unwrap();
public async Task StopAsync()
this.queue.Writer.Complete();
await this.worker;
public class DispatchInCallingThread: ICommandBus
public async Task<object> Send(object request, CancellationToken cancellationToken)
// simplified DI container magic to static invocation
if (request is BoringCommand boringCommand)
var handler = new BoringCommandHandler();
return await handler.Handle(boringCommand, cancellationToken);
else if (request is LockMeGoodCommand lockMeGoodCommand)
var handler = new LockMeGoodCommandHandler(this);
return await handler.Handle(lockMeGoodCommand, cancellationToken);
else
throw new Exception("unknown");
public void Start(CancellationToken cancellationToken)
public Task StopAsync() return Task.CompletedTask;
class Program
static async Task Main(string[] args)
await TestDispatchOnBackgroundThread();
static async Task TestDispatchOnBackgroundThread()
var cts = new CancellationTokenSource();
Console.CancelKeyPress += delegate
Console.WriteLine("setting cts.Cancel()");
cts.Cancel();
;
var threadStrategy = new DispatchOnBackgroundThread();
threadStrategy.Start(cts.Token);
while (cts.IsCancellationRequested == false)
Console.WriteLine("***************** sending new batch ****************");
var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
var result3 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
Thread.Sleep(1000);
await threadStrategy.StopAsync();
更多信息,依赖注入的实际实现在这里https://***.com/a/61791817/915839,它能够在工作线程中动态切换到线程内调度
【讨论】:
以上是关于后台工作人员中异步等待中的调解员死锁-如何检测线程调用自身的主要内容,如果未能解决你的问题,请参考以下文章