后台工作人员中异步等待中的调解员死锁-如何检测线程调用自身

Posted

技术标签:

【中文标题】后台工作人员中异步等待中的调解员死锁-如何检测线程调用自身【英文标题】:Mediator deadlock on async await within background worker - how to detect thread calling itself 【发布时间】:2020-08-28 16:34:42 【问题描述】:

我有一个中介,我最近需要在后台线程上一次同步一个消息调度,但它是锁定的,如下所示。

我将命令发布到队列并从 TaskCompletionSource 返回任务:

public Task<object> Send(object command, CancellationToken cancellationToken)

    var item = new CommandItem()  Command = request, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;            
    this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
    return item.Tcs.Task;

然后从后台工作程序中获取它,并创建处理程序:

var item = await this.queue.Reader.ReadAsync(cancellationToken);
// work out command  type snipped
var command = item.Command as LockMeGoodCommand;
var handler = new LockMeGoodCommandHandler();
var result = await handler.Handle(command, item.Ct);
item.Tcs.SetResult(result);

然后处理它,当命令处理程序被发送到命令处理程序内时,以下锁定(当使用后台线程时,但在线程内是可以的):

public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)

   Console.WriteLine(command.GetType().Name);

   // this would get the result but will lock forever when using background worker bus implementation
   var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);

   // perform some action based on the result - but we never get here
   Console.WriteLine("otherResult is " + otherResult);

   return 3;

** 问题和潜在的修复方法 **

我相信我们可以通过检测后台线程是否从其线程内向它自己发布(通过命令处理程序,然后调用 Send() 来发布新命令)来避免死锁,如果是,它不应使用任何线程机制(发布到命令队列或 TaskCompletionSource),而应直接处理任务。

我试图检测线程但它不工作,所以我在var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken, true) 上方的处理程序中将手动标志 isSameThread 设置为 true,我可以确认它工作正常并且避免了死锁 .

此修复中有任何警告吗?如何检测后台线程是否正在请求发送命令(线程如何检测自己)以及如何完成以下代码(从DispatchOnBackgroundThread.Send() 包含此自调用检测(这样我就可以取消isSameThread 标志)?

这似乎涉及更多,因为每个等待都会给出不同的线程 ID。

// in thread start we set the thread id of the background thread
this.workerThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;

public Task<object> Send(object command, CancellationToken cancellationToken, bool isSameThread = false)

    Console.WriteLine($"this.workerThreadId: this.workerThreadId, Thread.CurrentThread.ManagedThreadId: Thread.CurrentThread.ManagedThreadId");

    // below doesnt work gives different numbers so i use flag instead
    // this.workerThreadId == Thread.CurrentThread.ManagedThreadId
    if (isSameThread == true)
    
        if (command is BoringCommand boringCommand)
        
            var handler = new BoringCommandHandler();
            return handler.Handle(boringCommand, cancellationToken).ContinueWith(t => (object)t);

        
        else if (command is LockMeGoodCommand lockMeGoodCommand)
        
            var handler = new LockMeGoodCommandHandler(this);
            return handler.Handle(lockMeGoodCommand, cancellationToken).ContinueWith(t => (object)t);
        
        else
            throw new Exception("unknown");
    
    else
    
        var item = new CommandItem()  Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;
        this.queue.Writer.WriteAsync(item); // just write and immediatly return the cts
        return item.Tcs.Task;
    

** 代码演示问题**

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestDeadlock

    class BoringCommand  
    class LockMeGoodCommand      

    class BoringCommandHandler
    
        public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
        
            Console.WriteLine(command.GetType().Name);         
            return Task.FromResult(1);
        
    
    class LockMeGoodCommandHandler
    
        private readonly DispatchOnBackgroundThread commandBus;

        public LockMeGoodCommandHandler(DispatchOnBackgroundThread commandBus) => this.commandBus = commandBus;

        public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
        
            Console.WriteLine(command.GetType().Name);

            // this locks forever
            var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
            Console.WriteLine("otherResult is " + otherResult);
            return 3;
        
    

    public class DispatchOnBackgroundThread
    
        private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
        private Task worker = null;

        class CommandItem
        
            public object Command  get; set; 
            public CancellationToken Ct  get; set; 
            public TaskCompletionSource<object> Tcs  get; set; 
        

        public Task<object> Send(object command, CancellationToken cancellationToken)
        
            var item = new CommandItem()
             Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;            
            this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
            return item.Tcs.Task;
        

        public void Start(CancellationToken cancellationToken)
        
            this.worker = Task.Factory.StartNew(async () =>
            
                try
                                    
                    while (cancellationToken.IsCancellationRequested == false)
                    
                        var item = await this.queue.Reader.ReadAsync(cancellationToken);

                        // simplified DI container magic to static invocation
                        if (item.Command is BoringCommand boringCommand)
                        
                            var handler = new BoringCommandHandler();
                            var result = await handler.Handle(boringCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        
                        if (item.Command is LockMeGoodCommand lockMeGoodCommand)
                        
                            var handler = new LockMeGoodCommandHandler(this);
                            var result = await handler.Handle(lockMeGoodCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        
                    
                
                catch (TaskCanceledException)  
            ,
            TaskCreationOptions.LongRunning)
            .Unwrap();
        

        public async Task StopAsync()
        
            this.queue.Writer.Complete();
            await this.worker;
        
    

    class Program
    
        static async Task Main(string[] args)
        
            var cts = new CancellationTokenSource();
            var threadStrategy = new DispatchOnBackgroundThread();
            threadStrategy.Start(cts.Token);

            var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
            var result2 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);

            cts.Cancel();
            await threadStrategy.StopAsync();
        
    

** 无需锁定即可工作的简单非线程中介实现 **

public class DispatchInCallingThread

    public async Task<object> Send(object request, CancellationToken cancellationToken)
    
        // simplified DI container magic to static invocation
        if (request is BoringCommand boringCommand)
        
            var handler = new BoringCommandHandler();
            return await handler.Handle(boringCommand, cancellationToken);
        
        else if (request is LockMeGoodCommand lockMeGoodCommand)
        
            var handler = new LockMeGoodCommandHandler(this);
            return await handler.Handle(lockMeGoodCommand, cancellationToken);
        
        else
            throw new Exception("unknown");
    

【问题讨论】:

代码太多了。实际上可以肯定的是,您发布的代码中只有一小部分实际上涉及死锁。请修复。同时:坦率地说,您发现一些僵局场景尚未被 SO 上的许多现有问题充分解决的可能性微乎其微。你有一个线程应该在工作,另一个线程在等待它工作,每个线程都在等待对方。 ... ... 您真正需要的是中断调试器并查看每个线程被阻塞的位置。修复就像修复任何死锁一样:不要创建相互的锁依赖关系(如果可能,请避免使用多个锁,如果没有,请确保无论线程如何,您始终以相同的顺序获取所有锁)。 嗨,彼得,谢谢你的时间。如果有人愿意运行它,我已经包含了锁定会话的摘要和代码示例。我同意它很长(ish),但包含了非线程版本(有效)。我没有使用锁。在发布之前我已经详细查看了 SO,但我承认我被卡住了 “我没有使用锁。” -- 没有明确说明,但是线程同步中涉及到隐式锁来支持您使用await 和@987654330 @。例如,两个等待对方完成的方法都不会完成。不要在这里按字面意思理解“锁定”。在任何情况下,包含工作代码都是没有意义的,只会分散问题的注意力,甚至损坏的代码也可能不是最小重现问题。 @PeterDuniho 我已经清理了这个问题,以便更清楚地感谢指针/耐心 - 我相信通过检测处理程序中正在执行的新命令可以完全避免死锁,如果是这样直接处理而不是使用新的 TaskCompletionSource 等将命令发布到队列 - 我将在同一后台线程中处理重入/嵌套命令(就像单线程调解器一样)。我已经更新了要问的问题,假设这是正确的修复,如何检测我们是发布命令请求的后台线程。 【参考方案1】:

死锁的原因很简单:

有一个代码循环(不是特定线程;见下文)负责处理队列。在处理每个命令时,它会awaits 该命令的处理程序。 有一个命令处理程序,awaits 另一个命令要处理。但是,这不起作用,因为不会处理更多命令;在此命令完成之前,代码循环不会使下一个命令出队。

换句话说,如果一个命令一次只能执行一个命令,那么一个命令执行另一个命令在逻辑上是不可能的。

有几种可能的方法可以解决这个问题。我确实推荐“重入”方法;重入是许多微妙的逻辑错误的原因。我推荐的方法之一是:

    更改Send 语义,使其成为“队列”语义。这意味着不可能得到命令结果;结果必须通过某些中介作为消息发送。 让代码循环不是await 命令处理程序,允许它循环返回并获取下一个命令。这意味着它不再“一次同步一个”。 将“一次同步一个”重新定义为“一次同步一个,但如果是awaiting,则不算作一个”。在这种情况下,您可能可以使用 ConcurrentExclusiveSchedulerPairNito.AsyncEx.AsyncContext 之类的东西一次运行一个方法块。

旁注:LongRunning 没有做你认为它正在做的事情。 StartNew is not async-aware,所以LongRunning 标志只适用于直到第一个await 的代码;之后,该 lambda 中的代码将在任意线程池线程上运行(没有LongRunning 设置)。将StartNew 替换为Task.Run 将使代码更清晰。

【讨论】:

Stephen 感谢您的回答和指点,在您的 cmets 之后,我意识到我的设计存在缺陷。使用依赖注入,我确保将调用线程调度程序(无队列直通)实现传递给任何处理程序(如果在构造函数中请求),以便如果处理程序确实发布,它将绕过队列/工作逻辑并简单地处理等待在工作线程的调用线程中内联。您的 cmets 对这次审查至关重要,并帮助避免了重新进入方法,结果证明流程非常简洁【参考方案2】:

感谢 Stephen 的回答和 Peter 的 cmets,在说谢谢时确实非常清楚,

有一个代码循环(不是特定线程;见下文)是 负责处理队列。在处理每个命令时,它 等待该命令的处理程序。

有一个命令处理程序等待处理另一个命令。 但是,这行不通,因为不会有更多的命令 处理;在此之前,代码循环不会使下一个命令出队 一个完成。

考虑到上述情况,我找到了一种无需任何线程黑客(检测堆栈/重新进入深度等)或调度程序的处理方法。

在下面的示例中,我“注入”到处理程序中的不是循环调用类,而是一种不同类型的命令处理程序调度程序,它不进行任何排队,而是直接在线程内处理。

下面是从线程循环内部调用的,那么就没有相互依赖了:

public class DispatchInCallingThread: ICommandBus

    public async Task<object> Send(object request, CancellationToken cancellationToken)
    
        // simplified DI container magic to static invocation
        if (request is BoringCommand boringCommand)
        
            var handler = new BoringCommandHandler();
            return await handler.Handle(boringCommand, cancellationToken);
        
        else if (request is LockMeGoodCommand lockMeGoodCommand)
        
            var handler = new LockMeGoodCommandHandler(this);
            return await handler.Handle(lockMeGoodCommand, cancellationToken);
        
        else
            throw new Exception("cough furball");
    

    public void Start(CancellationToken cancellationToken)  

    public Task StopAsync()  return Task.CompletedTask; 

在后台线程中,这是对实例化命令处理程序的注入:

else if (item.Command is LockMeGoodCommand lockMeGoodCommand)

    var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
    var result = await handler.Handle(lockMeGoodCommand, item.Ct);
    item.Tcs.SetResult(result);

现在代码将永远运行(需要为设置的取消令牌源实现适当的关闭逻辑):

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestDeadlock

    class BoringCommand  
    class LockMeGoodCommand      

    class BoringCommandHandler
    
        public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
        
            Console.WriteLine(command.GetType().Name);         
            return Task.FromResult(1);
        
    

    class LockMeGoodCommandHandler
    
        private readonly ICommandBus commandBus;

        public LockMeGoodCommandHandler(ICommandBus commandBus) => this.commandBus = commandBus;

        public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
                    
            Console.WriteLine(command.GetType().Name);
            var otherResult =  await this.commandBus.Send(new BoringCommand(), cancellationToken);
            var otherResult2 = await this.commandBus.Send(new BoringCommand(), cancellationToken);
            return 3;
        
    

    public interface ICommandBus
    
        Task<object> Send(object request, CancellationToken cancellationToken);
        void Start(CancellationToken cancellationToken);
        Task StopAsync();
    

    public class DispatchOnBackgroundThread : ICommandBus
    
        private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
        private Task worker = null;
        private readonly DispatchInCallingThread dispatchInCallingThread = new DispatchInCallingThread();

        class CommandItem
        
            public object Command  get; set; 
            public CancellationToken Ct  get; set; 
            public TaskCompletionSource<object> Tcs  get; set; 
        

        public Task<object> Send(object command, CancellationToken cancellationToken)
        
            var item = new CommandItem()  Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken ;
            this.queue.Writer.WriteAsync(item, cancellationToken); // just write and immediatly return the cts
            return item.Tcs.Task;            
        

        public void Start(CancellationToken cancellationToken)
        
            var scheduler = new ConcurrentExclusiveSchedulerPair();

            this.worker = Task.Factory.StartNew(async () =>
            
                CommandItem item = null;
                try
                                
                    while (cancellationToken.IsCancellationRequested == false)
                    
                        item = await this.queue.Reader.ReadAsync(cancellationToken);

                        // simplified DI container magic to static invocation
                        if (item.Command is BoringCommand boringCommand)
                        
                            var handler = new BoringCommandHandler();
                            var result = handler.Handle(boringCommand, item.Ct);
                            item.Tcs.SetResult(result);

                        
                        else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
                        
                            var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
                            var result = await handler.Handle(lockMeGoodCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        
                        else
                            throw new Exception("unknown");
                    
                
                catch (TaskCanceledException)
                
                    if (item != null)
                        item.Tcs.SetCanceled();
                
                Console.WriteLine("exit background thread");
            )
            .Unwrap();  

        

        public async Task StopAsync()
        
            this.queue.Writer.Complete();
            await this.worker;
        
    

    public class DispatchInCallingThread: ICommandBus
    
        public async Task<object> Send(object request, CancellationToken cancellationToken)
        
            // simplified DI container magic to static invocation
            if (request is BoringCommand boringCommand)
            
                var handler = new BoringCommandHandler();
                return await handler.Handle(boringCommand, cancellationToken);
            
            else if (request is LockMeGoodCommand lockMeGoodCommand)
            
                var handler = new LockMeGoodCommandHandler(this);
                return await handler.Handle(lockMeGoodCommand, cancellationToken);
            
            else
                throw new Exception("unknown");
        

        public void Start(CancellationToken cancellationToken)  
        public Task StopAsync()  return Task.CompletedTask; 
    

    class Program
    
        static async Task Main(string[] args)
        
            await TestDispatchOnBackgroundThread();
        

        static async Task TestDispatchOnBackgroundThread()
        
            var cts = new CancellationTokenSource();

            Console.CancelKeyPress += delegate 
                Console.WriteLine("setting cts.Cancel()");
                cts.Cancel();
            ;

            var threadStrategy = new DispatchOnBackgroundThread();
            threadStrategy.Start(cts.Token);

            while (cts.IsCancellationRequested == false)
            
                Console.WriteLine("***************** sending new batch ****************");                
                var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
                var result3 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
                Thread.Sleep(1000);
            
            await threadStrategy.StopAsync();
        
    

更多信息,依赖注入的实际实现在这里https://***.com/a/61791817/915839,它能够在工作线程中动态切换到线程内调度

【讨论】:

以上是关于后台工作人员中异步等待中的调解员死锁-如何检测线程调用自身的主要内容,如果未能解决你的问题,请参考以下文章

导致死锁的异步/等待示例

swift/OC中的死锁问题

如何监控 MVC4 中的异步/等待死锁?

Core Data 私有队列死锁

如何避免线程的死锁

Python 3 - 主线程未检测到后台线程中的KeyboardInterrupt,直到用户将鼠标悬停在GUI窗口上