真正的并行处理[关闭]
Posted
技术标签:
【中文标题】真正的并行处理[关闭]【英文标题】:Truly parallel processing [closed] 【发布时间】:2021-05-03 00:37:38 【问题描述】:我想在 C# 中并行运行 7 个线程\任务。在 7 个线程中,我希望列表中第一个线程在某个超时时间内的结果可以说是 200 毫秒 (第一个线程没有必要先完成,只是如果它失败或花费超过所需的时间,我不需要等待任何其他任务完成)。如果我在 200 毫秒内没有得到列表中第一个线程的结果,或者结果为 null\invalid,我想中止或取消所有任务。如果第一个线程的结果在超时范围内并且有效,我想看看其他任务是否已完成并给出有效结果。如果是,那么我想获取结果并取消\终止所有剩余的任务。我使用Task类提出的代码如下:
public partial class Form1 : Form
public Form1()
InitializeComponent();
private async void btnStart_Click(object sender, EventArgs e)
TaskWrapper _taskWrapper = new TaskWrapper();
Stopwatch _st = new Stopwatch();
_st.Start();
ThreadObject t = await _taskWrapper.DoTasks();
_st.Stop();
if (t != null)
txtResult.Text += t._id.ToString() + " and " + t._idSec
+ " is completed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
else
txtResult.Text += "All failed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
//_taskWrapper = null;
//_taskWrapper.Dispose();
public class TaskWrapper
Task<ThreadObject> _t1;
Task<ThreadObject> _t2;
Task<ThreadObject> _t3;
Task<ThreadObject> _t4;
Task<ThreadObject> _t5;
Task<ThreadObject> _t6;
Task<ThreadObject> _t7;
ThreadObject _tO1;
ThreadObject _tO2;
ThreadObject _tO3;
ThreadObject _tO4;
ThreadObject _tO5;
ThreadObject _tO6;
ThreadObject _tO7;
public async Task<ThreadObject> DoTasks()
var _cancellationTokenSource = new CancellationTokenSource();
CancellationToken _cancellationToken = _cancellationTokenSource.Token;
//_cancellationToken.Register(() => this.Dispose(); );
_tO1 = new ThreadObject(1);
_t1 = new Task<ThreadObject>(() => _tO1.DoAction(50), _cancellationToken);
_tO2 = new ThreadObject(2);
_t2 = new Task<ThreadObject>(() => _tO2.DoAction(10), _cancellationToken);
_tO3 = new ThreadObject(3);
_t3 = new Task<ThreadObject>(() => _tO3.DoAction(30), _cancellationToken);
_tO4 = new ThreadObject(4);
_t4 = new Task<ThreadObject>(() => _tO4.DoAction(40), _cancellationToken);
_tO5 = new ThreadObject(5);
_t5 = new Task<ThreadObject>(() => _tO5.DoAction(60), _cancellationToken);
_tO6 = new ThreadObject(6);
_t6 = new Task<ThreadObject>(() => _tO6.DoAction(70), _cancellationToken);
_tO7 = new ThreadObject(7);
_t7 = new Task<ThreadObject>(() => _tO7.DoAction(200), _cancellationToken);
var _tasks = new List<Task<ThreadObject>> _t1, _t2, _t3, _t4, _t5, _t6, _t7,
new Task<ThreadObject>(() => Thread.Sleep(1); return null; ) ;
var _completedTasks = new List<Task<ThreadObject>>();
var _mainTask = new List<Task<ThreadObject>>();
_tasks.AsParallel().WithCancellation(_cancellationToken).ForAll(o => o.Start());
while (_tasks.Count > 0)
var completed = await Task.WhenAny(_tasks);
if (completed.Result._isComplete)
if (completed.Result._id == 1)
_mainTask.Add(completed);
else
_completedTasks.Add(completed);
if (_completedTasks.Count > 0 && _mainTask.Count > 0)
_mainTask[0].Result._idSec = _completedTasks[0].Result._id;
_cancellationTokenSource.Cancel();
return await _mainTask[0];
else
_tasks.Remove(completed);
return null;
public class ThreadObject
public bool _isComplete = false;
public int _id;
public int _idSec;
private Thread thread;
//private static readonly object lockObj;
public ThreadObject(int id)
_id = id;
public ThreadObject DoAction(int _milliseconds)
Thread.Sleep(_milliseconds);
_isComplete = true;
File.AppendAllText("E:\\ThreadingDemo2_" + _id + ".txt",
DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
+ " Processing finished." + Environment.NewLine);
return this;
我不确定如何在上述方法中实现超时逻辑。因此,我使用Thread
类实现了另一个逻辑,以实现以下相同:
public partial class Form1 : Form
public Form1()
InitializeComponent();
private void btnStart_Click(object sender, EventArgs e)
ThreadWrapper threadWrapper = new ThreadWrapper();
Stopwatch _st = new Stopwatch();
_st.Start();
ThreadObject TO = threadWrapper.DoTasks();
_st.Stop();
if (TO != null)
if (TO._idSec == 0)
txtResult.Text += TO._id.ToString() + " is completed in "
+ _st.ElapsedMilliseconds + " ms." + Environment.NewLine;
else
txtResult.Text += TO._id.ToString() + " and " + TO._idSec
+ " is completed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
else
txtResult.Text += "All failed in " + _st.ElapsedMilliseconds + " ms."
+ Environment.NewLine;
public class ThreadWrapper
Thread _t1;
Thread _t2;
Thread _t3;
Thread _t4;
Thread _t5;
Thread _t6;
Thread _t7;
ThreadObject _tO1;
ThreadObject _tO2;
ThreadObject _tO3;
ThreadObject _tO4;
ThreadObject _tO5;
ThreadObject _tO6;
ThreadObject _tO7;
ThreadObject mainThread;
int SecId = 0;
public ThreadObject DoTasks()
_tO1 = new ThreadObject(1);
//_tO1.TaskCompleted += TaskCompleted;
_t1 = new Thread(() => mainThread = _tO1.DoAction(50));
_tO2 = new ThreadObject(2);
_tO2.TaskCompleted += TaskCompleted;
_t2 = new Thread(() => _tO2.DoAction(10));
_tO3 = new ThreadObject(3);
_tO3.TaskCompleted += TaskCompleted;
_t3 = new Thread(() => _tO3.DoAction(30));
_tO4 = new ThreadObject(4);
_tO4.TaskCompleted += TaskCompleted;
_t4 = new Thread(() => _tO4.DoAction(40));
_tO5 = new ThreadObject(5);
_tO5.TaskCompleted += TaskCompleted;
_t5 = new Thread(() => _tO5.DoAction(60));
_tO6 = new ThreadObject(6);
_tO6.TaskCompleted += TaskCompleted;
_t6 = new Thread(() => _tO6.DoAction(70));
_tO7 = new ThreadObject(7);
_tO7.TaskCompleted += TaskCompleted;
_t7 = new Thread(() => _tO7.DoAction(20));
List<Thread> threads = new List<Thread> _t1, _t2, _t3, _t4, _t5, _t6, _t7 ;
threads.AsParallel<Thread>().ForAll<Thread>(o => o.Start());
_t1.Join(200);
Task.Run(() =>
threads.AsParallel<Thread>().ForAll<Thread>(o => o.Abort()));
if (mainThread != null)
mainThread._idSec = SecId;
return mainThread;
private void TaskCompleted(int id, int idSec, bool isCompleted)
if (SecId == 0)
SecId = id;
public class ThreadObject
public bool _isComplete = false;
public int _id;
public int _idSec;
public delegate void NotifyTaskCompletion(int id, int idSec, bool isCompleted);
public event NotifyTaskCompletion TaskCompleted;
private static readonly object lockObj = 1;
//private static readonly object lockObj;
public ThreadObject(int id)
_id = id;
public ThreadObject DoAction(int _milliseconds)
Thread.Sleep(_milliseconds);
_isComplete = true;
//lock (lockObj)
//
// File.AppendAllText("E:\\ThreadingDemo3_" + _id + ".txt"
// , DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
// + " Processing finished." + Environment.NewLine);
//
TaskCompleted?.Invoke(_id, _idSec, _isComplete);
return this;
我以前使用Parallel.Invoke
方法并行执行任务,但最近我注意到Parallel.Invoke
实际上并没有并行处理任务,因此最终占用了不必要的时间。
现在到我的实际问题,是的,有多个,如下:
-
有没有办法在我的并行任务中添加超时?我已经知道一种方法可以在并行运行的任务集合中添加一个任务,该任务将暂停 200 毫秒或我想要的任何超时,然后返回 null。 (由于我使用
WhenAny
来检查是否完成,控制将返回给调用 UI 线程)。但这对我来说就像在作弊,所以我想知道是否有其他方法。
当我从一个类创建Task
在这种情况下TaskWrapper
将处置或设置TaskWrapper
对象为空中止或销毁使用该对象创建\运行的任务?
同样,对于线程,如果我在本例ThreadWrapper
中从一个类内部创建了一个线程\线程,是否会将ThreadWrapper
对象处理或设置为空中止或销毁使用该对象创建\运行的线程?
已编辑:为了消除评论中提到的混淆。
【问题讨论】:
这个问题有点混乱,如果第一个线程先结束,为什么还有线程已经结束。 第一个线程不必先完成。但是,如果它没有在规定的时间内完成,我不需要任何线程的任何结果。这是我需要实现的部分逻辑。 第一个任务,是一个特定的任务还是列表中能够在 200 毫秒内完成的第一个任务? @AnuragPatil 任务不是线程,也不需要包装器。 .NET 已经为数据并行性、并发性、管道处理等提供了几个更高级别的抽象。您可以使用Parallel.ForEach
或PLINQ
使用所有可用内核并行处理大量数据。您可以使用 Dataflow 类来构建每个块具有不同并行度的处理管道。所有课程都支持开箱即用的取消。那你想做什么?
@AnuragPatil 实际问题是什么?整个问题令人困惑,代码根本没有帮助,因为它试图重新发明任务 - 它试图在线程上创建包装器,即使 tasks 以这种方式工作。管理线程是TaskScheduler的工作,不需要手动处理线程
【参考方案1】:
简单的方法是使用 CancelAfter 方法在 200 毫秒后取消的 cancelToken。另一个选项是运行await Task.Delay(200)
。我还建议使用Parallel.For
,因为这将是更紧凑的代码:
public static void CancelParallel()
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(200));
var po = new ParallelOptions()
CancellationToken = cts.Token
;
void ParallelMethod(int iterationIndex, ParallelLoopState state)
// Do whatever
// if the iteration is long running you can check if it should exit
if (state.ShouldExitCurrentIteration)
return;
var loopResult = Parallel.For(0, 7, po, ParallelMethod);
if (loopResult.IsCompleted)
// Do something if all threads completed
else
// Do something else
如果您只关心第一次迭代的成功,请在成功时设置一些标志并检查它而不是 loopResult。
请注意,Parallel.For 将在取消令牌时停止运行新的迭代,但会让现有的迭代运行到完成,因此您可能希望也可能不希望检查迭代内的令牌
【讨论】:
检查并行循环内的po.CancellationToken
可能是多余的。 Parallel.For
实现应该监视令牌,并在令牌发出信号时传播OperationCanceledException
。如果您想提前退出以便单个长时间运行的操作不会不必要地延迟 Parallel.For
的完成,则要查询的属性可能是 ParallelLoopState.ShouldExitCurrentIteration
。
目的是演示如何取消长时间运行的循环迭代。检查po.CancellationToken
是从how to cancel a parallel loop复制的
我不喜欢文档中的示例。请查看this小提琴。取消令牌时,Parallel.For
会自动取消。以上是关于真正的并行处理[关闭]的主要内容,如果未能解决你的问题,请参考以下文章