强制终止由 Parallel.ForEach 生成的线程 [重复]

Posted

技术标签:

【中文标题】强制终止由 Parallel.ForEach 生成的线程 [重复]【英文标题】:Forcefully terminate thread generated by Parallel.ForEach [duplicate] 【发布时间】:2021-05-09 23:23:55 【问题描述】:

使用Parallel.ForEach()时,有没有办法在特定线程上强制执行Thread.Abort

我知道Thread.Abort()不推荐。

我正在对数十万个实体的集合运行Parallel.ForEach()

在某些情况下,循环会处理 30 年前的数据。我们遇到了一些线程挂起的问题。当我们试图掌握这一点时,希望将实现称为故障保险。如果线程运行超过 x 时间,则强制终止线程。

我不想使用取消令牌。

这会很丑,但还没有找到其他解决方案。是否有可能:

让每个线程打开一个计时器。将Thread.CurrentThread 的引用传递给计时器 如果计时器已过,但处理尚未完成,请在该计时器上调用 Thread.Abort 如果需要,发出事件等待句柄以允许下一个患者处理
private void ProcessEntity(ProcessParams param,
    ConcurrentDictionary<long, string> entities)

    var options = new ParallelOptions
    
        MaxDegreeOfParallelism = 2
    ;
    Parallel.ForEach(person, options, p =>
    
        ProcessPerson(param, p);
    );


internal void ProcessPerson(ProcessParams param, KeyValuePair<long, string> p)

    try
    
        //...
    
    catch (Exception ex)
    

    
    param.eventWaitHandle?.WaitOne();


【问题讨论】:

为什么不使用ParallelLoopState?它专为您的情况而设计 请问您为什么不想使用取消令牌?听起来这就是取消令牌的作用。 ***.com/questions/22647242/… 不建议在没有与该线程合作的情况下取消线程。您可能会使用超时并跳过挂起线程的结果,但这可能会留下一堆挂起的线程。我非常建议调查为什么某些操作挂起?是否有一些 IO 操作,如果有的话,它可能有一个内置的超时,你可以使用?是死锁吗?如果是这样,修复它! @SteveNorwood 如果线程进入死锁状态,或者只是停止响应,我不相信取消令牌会起作用,对吗?对于取消令牌,我认为我需要处于可以检查取消令牌并决定是继续还是取消的工作状态 顺便说一句,我已经投票支持重新打开这个问题,因为这个问题是关于在 Parallel.ForEach 操作期间中止线程。 suggested as duplicate 是关于中止基于委托的Tasks 的线程。这是一个完全不同的情况,它没有涵盖任务并行库中包含的所有 TPL 构造(尤其是更高级别的构造)的复杂性。 【参考方案1】:

似乎Parallel.ForEach 方法在其工作线程被中止时没有弹性,并且行为不一致。其他时候传播一个包含ThreadAbortExceptionAggregateException,其他时候它直接抛出一个ThreadAbortException,一个丑陋的堆栈跟踪显示its internals。

下面是一个自定义的ForEachTimeoutAbort 方法,它提供Parallel.ForEach 的基本功能,但没有取消、循环状态、自定义分区器等高级功能。它的特殊功能是TimeSpan timeout 参数,它中止工作线程任何需要很长时间才能完成的项目。

public static void ForEachTimeoutAbort<TSource>(
    this IEnumerable<TSource> source,
    Action<TSource> action,
    int maxDegreeOfParallelism,
    TimeSpan timeout)

    // Arguments validation omitted
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    var exceptions = new ConcurrentQueue<Exception>();
    try
    
        foreach (var item in source)
        
            semaphore.Wait();
            if (!exceptions.IsEmpty)  semaphore.Release(); break; 

            ThreadPool.QueueUserWorkItem(_ =>
            
                var timer = new Timer(state => ((Thread)state).Abort(),
                    Thread.CurrentThread, Timeout.Infinite, Timeout.Infinite);
                try
                
                    timer.Change(timeout, Timeout.InfiniteTimeSpan);
                    action(item);
                
                catch (Exception ex)  exceptions.Enqueue(ex); 
                finally
                
                    using (var waitHandle = new ManualResetEvent(false))
                    
                        timer.Dispose(waitHandle);
                        // Wait the timer's callback (if it's queued) to complete.
                        waitHandle.WaitOne();
                    
                    semaphore.Release();
                
            );
        
    
    catch (Exception ex)  exceptions.Enqueue(ex); 

    // Wait for all pending operations to complete
    for (int i = 0; i < maxDegreeOfParallelism; i++) semaphore.Wait();
    if (!exceptions.IsEmpty) throw new AggregateException(exceptions);

ThreadAbortException 的一个特点是它无法被捕获。所以为了防止并行循环过早完成,必须从catch块中调用Thread.ResetAbort方法。

使用示例:

ForEachTimeoutAbort(persons, p =>

    try
    
        ProcessPerson(param, p);
    
    catch (ThreadAbortException)
    
        Thread.ResetAbort();
    
, maxDegreeOfParallelism: 2, timeout: TimeSpan.FromSeconds(30));

.NET Framework 是唯一可以使用ForEachTimeoutAbort 方法的平台。对于 .NET Core 和 .NET 5,可以尝试将其转换为 ForEachTimeoutInterrupt,并将对 Abort 的调用替换为对 Interrupt 的调用。中断线程不如中止它有效,因为它仅在线程处于等待/睡眠模式时才有效。但在某些情况下可能就足够了。

【讨论】:

以上是关于强制终止由 Parallel.ForEach 生成的线程 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

使用 Parallel.ForEach 之外的选项

Python 中的 C# Parallel.Foreach 等效项

如何限制 Parallel.ForEach?

计算 Parallel.ForEach 使用的线程数

Parallel.ForEach 比 ForEach 慢

打破parallel.foreach?