取消处于阻塞状态的任务的最佳方法是啥?

Posted

技术标签:

【中文标题】取消处于阻塞状态的任务的最佳方法是啥?【英文标题】:What is the best way to cancel a task that is in a blocking state?取消处于阻塞状态的任务的最佳方法是什么? 【发布时间】:2014-07-17 18:42:14 【问题描述】:

我有正在运行的任务调用从 RabbitMQ 读取的方法。当队列中没有任何内容时,该方法只是阻塞。所以这些任务有一个“正在运行”的状态,但实际上并没有做任何事情。有什么方法可以优雅地结束这些任务?

访问队列的代码如下:

 private void FindWork(CancellationToken ct)
    
        if (ct.IsCancellationRequested)
            return;

        bool result = false;
        bool process = false;
        bool queueResult = false;
        Work_Work work = null;

        try
        
            using (Queue workQueue = new Queue(_workQueue))
            
                // Look for work on the work queue
                workQueue.Open(Queue.Mode.Consume);
                work = workQueue.ConsumeWithBlocking<Work_Work>();

                // Do some work with the message ...

                return;

任务创建如下:

private void Run()
    
        while (!_stop)
        
            // Remove and stopped tasks from the pool
            List<int> removeThreads = new List<int>();

            lock (_tasks)
            
                foreach (KeyValuePair<int, Task> task in _tasks)
                
                    if (task.Value.Status != TaskStatus.Running)
                    
                        task.Value.Wait();
                        removeThreads.Add(task.Value.Id);
                    
                

                foreach (int taskID in removeThreads)
                    _tasks.Remove(taskID);
            

            CancellationToken ct = _cts.Token;
            TaskFactory factory = new TaskFactory(ct, TaskCreationOptions.LongRunning, TaskContinuationOptions.LongRunning, null);

            // Create new tasks if we have room in the pool
            while (_tasks.Count < _runningMax)
            
                Task task = factory.StartNew(() => FindWork(ct));

                lock (_tasks)
                    _tasks.Add(task.Id, task);
            

            // Take a rest so we don't run the CPU to death
            Thread.Sleep(1000);
        
    

目前我已将我的任务创建代码更改为如下所示,以便我可以中止任务。我知道这不是一个好的解决方案,但我不知道还能做什么。

while (_tasks.Count < _runningMax)
            
                Task task = factory.StartNew(() =>
                    
                        try
                        
                            using (_cts.Token.Register(Thread.CurrentThread.Abort))
                            
                                FindWork(ct);
                            
                        
                        catch (ThreadAbortException)
                        
                            return;
                        
                    , _cts.Token);

                _tasks.Add(task.Id, task);
            

【问题讨论】:

您能否展示一下您如何使用排队系统的代码? 当队列中有内容时,您是否可以不利用 async/await 来恢复执行,从而允许线程返回线程池,而不是阻塞线程? 当队列中有东西时执行会恢复。 【参考方案1】:

以下是否适用于您的场景?

我不会产生多个线程并让它们在队列中等待,而是在无限轮询循环中创建一个线程,并在有新工作进入时让那个线程产生一个新线程。您可以添加一个信号量到限制您创建的线程数。检查下面的示例代码,我使用了 BlockingCollection 而不是 RabbitMQ 。

  public class QueueManager
    
        public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
        private const int _maxRunningTasks = 3;

        static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);

        public void Queue()
        
            blockingCollection.Add(new Work());
        

        public void Consume()
        
            while (true)
            
                Work work = blockingCollection.Take();

                _sem.Wait();

                Task t = Task.Factory.StartNew(work.DoWork);
            
        

        public class Work
        
            public void DoWork()
            
                Thread.Sleep(5000);
                _sem.Release();
                Console.WriteLine("Finished work");
            
        
    

还有我的测试班

class Test
    
        static void Main(string[] args)
        
            Consumer c = new Consumer();
            Task t = Task.Factory.StartNew(c.Consume);

            c.Queue();
            c.Queue();
            c.Queue();
            c.Queue();
            c.Queue();

            Thread.Sleep(1000);
            Console.ReadLine();
        
    

【讨论】:

【参考方案2】:

要完成这项工作,您需要更改 ConsumeWithBlocking 以支持取消。我不熟悉RabbitMQ,但显然它支持cancellation on the consumer channel。

因此,不要从Token.Register 回调中执行Thread.CurrentThread.Abort,而是做正确的事情并通过适当的RabbitMQ API 优雅地取消操作。

附带说明,您当前尝试中止的线程很可能不是被ConsumeWithBlocking 阻止的线程。

【讨论】:

以上是关于取消处于阻塞状态的任务的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

如何唤醒一个处于阻塞状态下的线程

JAVA并发终结任务

是否有一种非阻塞方式来检查 asyncio 子进程是否处于活动状态?

Java中是否父线程阻塞后子线程就无法继续执行?

InternalError:流在完成之前没有阻塞主机;已经处于错误状态

非阻塞io模型和io多路复用----select