取消处于阻塞状态的任务的最佳方法是啥?
Posted
技术标签:
【中文标题】取消处于阻塞状态的任务的最佳方法是啥?【英文标题】:What is the best way to cancel a task that is in a blocking state?取消处于阻塞状态的任务的最佳方法是什么? 【发布时间】:2014-07-17 18:42:14 【问题描述】:我有正在运行的任务调用从 RabbitMQ 读取的方法。当队列中没有任何内容时,该方法只是阻塞。所以这些任务有一个“正在运行”的状态,但实际上并没有做任何事情。有什么方法可以优雅地结束这些任务?
访问队列的代码如下:
private void FindWork(CancellationToken ct)
if (ct.IsCancellationRequested)
return;
bool result = false;
bool process = false;
bool queueResult = false;
Work_Work work = null;
try
using (Queue workQueue = new Queue(_workQueue))
// Look for work on the work queue
workQueue.Open(Queue.Mode.Consume);
work = workQueue.ConsumeWithBlocking<Work_Work>();
// Do some work with the message ...
return;
任务创建如下:
private void Run()
while (!_stop)
// Remove and stopped tasks from the pool
List<int> removeThreads = new List<int>();
lock (_tasks)
foreach (KeyValuePair<int, Task> task in _tasks)
if (task.Value.Status != TaskStatus.Running)
task.Value.Wait();
removeThreads.Add(task.Value.Id);
foreach (int taskID in removeThreads)
_tasks.Remove(taskID);
CancellationToken ct = _cts.Token;
TaskFactory factory = new TaskFactory(ct, TaskCreationOptions.LongRunning, TaskContinuationOptions.LongRunning, null);
// Create new tasks if we have room in the pool
while (_tasks.Count < _runningMax)
Task task = factory.StartNew(() => FindWork(ct));
lock (_tasks)
_tasks.Add(task.Id, task);
// Take a rest so we don't run the CPU to death
Thread.Sleep(1000);
目前我已将我的任务创建代码更改为如下所示,以便我可以中止任务。我知道这不是一个好的解决方案,但我不知道还能做什么。
while (_tasks.Count < _runningMax)
Task task = factory.StartNew(() =>
try
using (_cts.Token.Register(Thread.CurrentThread.Abort))
FindWork(ct);
catch (ThreadAbortException)
return;
, _cts.Token);
_tasks.Add(task.Id, task);
【问题讨论】:
您能否展示一下您如何使用排队系统的代码? 当队列中有内容时,您是否可以不利用 async/await 来恢复执行,从而允许线程返回线程池,而不是阻塞线程? 当队列中有东西时执行会恢复。 【参考方案1】:以下是否适用于您的场景?
我不会产生多个线程并让它们在队列中等待,而是在无限轮询循环中创建一个线程,并在有新工作进入时让那个线程产生一个新线程。您可以添加一个信号量到限制您创建的线程数。检查下面的示例代码,我使用了 BlockingCollection 而不是 RabbitMQ 。
public class QueueManager
public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
private const int _maxRunningTasks = 3;
static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);
public void Queue()
blockingCollection.Add(new Work());
public void Consume()
while (true)
Work work = blockingCollection.Take();
_sem.Wait();
Task t = Task.Factory.StartNew(work.DoWork);
public class Work
public void DoWork()
Thread.Sleep(5000);
_sem.Release();
Console.WriteLine("Finished work");
还有我的测试班
class Test
static void Main(string[] args)
Consumer c = new Consumer();
Task t = Task.Factory.StartNew(c.Consume);
c.Queue();
c.Queue();
c.Queue();
c.Queue();
c.Queue();
Thread.Sleep(1000);
Console.ReadLine();
【讨论】:
【参考方案2】:要完成这项工作,您需要更改 ConsumeWithBlocking
以支持取消。我不熟悉RabbitMQ,但显然它支持cancellation on the consumer channel。
因此,不要从Token.Register
回调中执行Thread.CurrentThread.Abort
,而是做正确的事情并通过适当的RabbitMQ API 优雅地取消操作。
附带说明,您当前尝试中止的线程很可能不是被ConsumeWithBlocking
阻止的线程。
【讨论】:
以上是关于取消处于阻塞状态的任务的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
是否有一种非阻塞方式来检查 asyncio 子进程是否处于活动状态?