真正的并行处理[关闭]

Posted

技术标签:

【中文标题】真正的并行处理[关闭]【英文标题】:Truly parallel processing [closed] 【发布时间】:2021-05-03 00:37:38 【问题描述】:

我想在 C# 中并行运行 7 个线程\任务。在 7 个线程中,我希望列表中第一个线程在某个超时时间内的结果可以说是 200 毫秒 (第一个线程没有必要先完成,只是如果它失败或花费超过所需的时间,我不需要等待任何其他任务完成)。如果我在 200 毫秒内没有得到列表中第一个线程的结果,或者结果为 null\invalid,我想中止或取消所有任务。如果第一个线程的结果在超时范围内并且有效,我想看看其他任务是否已完成并给出有效结果。如果是,那么我想获取结果并取消\终止所有剩余的任务。我使用Task类提出的代码如下:

public partial class Form1 : Form

    public Form1()
    
        InitializeComponent();
    

    private async void btnStart_Click(object sender, EventArgs e)
    
        TaskWrapper _taskWrapper = new TaskWrapper();
        Stopwatch _st = new Stopwatch();
        _st.Start();
        ThreadObject t = await _taskWrapper.DoTasks();
        _st.Stop();
        if (t != null)
        
            txtResult.Text += t._id.ToString() + " and " + t._idSec
                + " is completed in " + _st.ElapsedMilliseconds + " ms."
                + Environment.NewLine;
        
        else
        
            txtResult.Text += "All failed in " + _st.ElapsedMilliseconds + " ms."
                + Environment.NewLine;
        
        //_taskWrapper = null;
        //_taskWrapper.Dispose();
    


public class TaskWrapper

    Task<ThreadObject> _t1;
    Task<ThreadObject> _t2;
    Task<ThreadObject> _t3;
    Task<ThreadObject> _t4;
    Task<ThreadObject> _t5;
    Task<ThreadObject> _t6;
    Task<ThreadObject> _t7;

    ThreadObject _tO1;
    ThreadObject _tO2;
    ThreadObject _tO3;
    ThreadObject _tO4;
    ThreadObject _tO5;
    ThreadObject _tO6;
    ThreadObject _tO7;

    public async Task<ThreadObject> DoTasks()
    
        var _cancellationTokenSource = new CancellationTokenSource();
        CancellationToken _cancellationToken = _cancellationTokenSource.Token;

        //_cancellationToken.Register(() =>  this.Dispose(); );

        _tO1 = new ThreadObject(1);
        _t1 = new Task<ThreadObject>(() => _tO1.DoAction(50), _cancellationToken);

        _tO2 = new ThreadObject(2);
        _t2 = new Task<ThreadObject>(() => _tO2.DoAction(10), _cancellationToken);

        _tO3 = new ThreadObject(3);
        _t3 = new Task<ThreadObject>(() => _tO3.DoAction(30), _cancellationToken);

        _tO4 = new ThreadObject(4);
        _t4 = new Task<ThreadObject>(() => _tO4.DoAction(40), _cancellationToken);

        _tO5 = new ThreadObject(5);
        _t5 = new Task<ThreadObject>(() => _tO5.DoAction(60), _cancellationToken);

        _tO6 = new ThreadObject(6);
        _t6 = new Task<ThreadObject>(() => _tO6.DoAction(70), _cancellationToken);

        _tO7 = new ThreadObject(7);
        _t7 = new Task<ThreadObject>(() => _tO7.DoAction(200), _cancellationToken);

        var _tasks = new List<Task<ThreadObject>>  _t1, _t2, _t3, _t4, _t5, _t6, _t7,
            new Task<ThreadObject>(() =>  Thread.Sleep(1); return null; ) ;

        var _completedTasks = new List<Task<ThreadObject>>();

        var _mainTask = new List<Task<ThreadObject>>();

        _tasks.AsParallel().WithCancellation(_cancellationToken).ForAll(o => o.Start());
        while (_tasks.Count > 0)
        
            var completed = await Task.WhenAny(_tasks);
            if (completed.Result._isComplete)
            
                if (completed.Result._id == 1)
                
                    _mainTask.Add(completed);
                
                else
                
                    _completedTasks.Add(completed);
                
                if (_completedTasks.Count > 0 && _mainTask.Count > 0)
                
                    _mainTask[0].Result._idSec = _completedTasks[0].Result._id;
                    _cancellationTokenSource.Cancel();
                    return await _mainTask[0];
                
            
            else
            
                _tasks.Remove(completed);
            
        
        return null;
    


public class ThreadObject

    public bool _isComplete = false;
    public int _id;
    public int _idSec;

    private Thread thread;
    //private static readonly object lockObj;
    public ThreadObject(int id)
    
        _id = id;
    
    public ThreadObject DoAction(int _milliseconds)
    
        Thread.Sleep(_milliseconds);
        _isComplete = true;
        File.AppendAllText("E:\\ThreadingDemo2_" + _id + ".txt",
            DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
                + " Processing finished." + Environment.NewLine);
        return this;
    

我不确定如何在上述方法中实现超时逻辑。因此,我使用Thread 类实现了另一个逻辑,以实现以下相同:

public partial class Form1 : Form

    public Form1()
    
        InitializeComponent();
    

    private void btnStart_Click(object sender, EventArgs e)
    
        ThreadWrapper threadWrapper = new ThreadWrapper();
        Stopwatch _st = new Stopwatch();
        _st.Start();
        ThreadObject TO = threadWrapper.DoTasks();
        _st.Stop();
        if (TO != null)
        
            if (TO._idSec == 0)
                txtResult.Text += TO._id.ToString() + " is completed in "
                    + _st.ElapsedMilliseconds + " ms." + Environment.NewLine;
            else
                txtResult.Text += TO._id.ToString() + " and " + TO._idSec
                    + " is completed in " + _st.ElapsedMilliseconds + " ms."
                    + Environment.NewLine;
        
        else
        
            txtResult.Text += "All failed in " + _st.ElapsedMilliseconds + " ms."
                + Environment.NewLine;
        
    


public class ThreadWrapper

    Thread _t1;
    Thread _t2;
    Thread _t3;
    Thread _t4;
    Thread _t5;
    Thread _t6;
    Thread _t7;

    ThreadObject _tO1;
    ThreadObject _tO2;
    ThreadObject _tO3;
    ThreadObject _tO4;
    ThreadObject _tO5;
    ThreadObject _tO6;
    ThreadObject _tO7;

    ThreadObject mainThread;
    int SecId = 0;

    public ThreadObject DoTasks()
    
        _tO1 = new ThreadObject(1);
        //_tO1.TaskCompleted += TaskCompleted;
        _t1 = new Thread(() => mainThread = _tO1.DoAction(50));

        _tO2 = new ThreadObject(2);
        _tO2.TaskCompleted += TaskCompleted;
        _t2 = new Thread(() => _tO2.DoAction(10));

        _tO3 = new ThreadObject(3);
        _tO3.TaskCompleted += TaskCompleted;
        _t3 = new Thread(() => _tO3.DoAction(30));

        _tO4 = new ThreadObject(4);
        _tO4.TaskCompleted += TaskCompleted;
        _t4 = new Thread(() => _tO4.DoAction(40));

        _tO5 = new ThreadObject(5);
        _tO5.TaskCompleted += TaskCompleted;
        _t5 = new Thread(() => _tO5.DoAction(60));

        _tO6 = new ThreadObject(6);
        _tO6.TaskCompleted += TaskCompleted;
        _t6 = new Thread(() => _tO6.DoAction(70));

        _tO7 = new ThreadObject(7);
        _tO7.TaskCompleted += TaskCompleted;
        _t7 = new Thread(() => _tO7.DoAction(20));

        List<Thread> threads = new List<Thread>  _t1, _t2, _t3, _t4, _t5, _t6, _t7 ;

        threads.AsParallel<Thread>().ForAll<Thread>(o => o.Start());

        _t1.Join(200);

        Task.Run(() =>
        threads.AsParallel<Thread>().ForAll<Thread>(o => o.Abort()));
        if (mainThread != null)
        
            mainThread._idSec = SecId;
        

        return mainThread;
    

    private void TaskCompleted(int id, int idSec, bool isCompleted)
    
        if (SecId == 0)
            SecId = id;
    


public class ThreadObject

    public bool _isComplete = false;
    public int _id;
    public int _idSec;
    public delegate void NotifyTaskCompletion(int id, int idSec, bool isCompleted);
    public event NotifyTaskCompletion TaskCompleted;

    private static readonly object lockObj = 1;
    //private static readonly object lockObj;
    public ThreadObject(int id)
    
        _id = id;
    
    public ThreadObject DoAction(int _milliseconds)
    
        Thread.Sleep(_milliseconds);
        _isComplete = true;
        //lock (lockObj)
        //
        //    File.AppendAllText("E:\\ThreadingDemo3_" + _id + ".txt"
        //        , DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
        //            + " Processing finished." + Environment.NewLine);
        //
        TaskCompleted?.Invoke(_id, _idSec, _isComplete);
        return this;
    

我以前使用Parallel.Invoke 方法并行执行任务,但最近我注意到Parallel.Invoke 实际上并没有并行处理任务,因此最终占用了不必要的时间。 现在到我的实际问题,是的,有多个,如下:

    有没有办法在我的并行任务中添加超时?我已经知道一种方法可以在并行运行的任务集合中添加一个任务,该任务将暂停 200 毫秒或我想要的任何超时,然后返回 null。 (由于我使用WhenAny 来检查是否完成,控制将返回给调用 UI 线程)。但这对我来说就像在作弊,所以我想知道是否有其他方法。 当我从一个类创建Task 在这种情况下TaskWrapper 将处置或设置TaskWrapper 对象为空中止或销毁使用该对象创建\运行的任务? 同样,对于线程,如果我在本例ThreadWrapper 中从一个类内部创建了一个线程\线程,是否会将ThreadWrapper 对象处理或设置为空中止或销毁使用该对象创建\运行的线程?

已编辑:为了消除评论中提到的混淆。

【问题讨论】:

这个问题有点混乱,如果第一个线程先结束,为什么还有线程已经结束。 第一个线程不必先完成。但是,如果它没有在规定的时间内完成,我不需要任何线程的任何结果。这是我需要实现的部分逻辑。 第一个任务,是一个特定的任务还是列表中能够在 200 毫秒内完成的第一个任务? @AnuragPatil 任务不是线程,也不需要包装器。 .NET 已经为数据并行性、并发性、管道处理等提供了几个更高级别的抽象。您可以使用Parallel.ForEachPLINQ 使用所有可用内核并行处理大量数据。您可以使用 Dataflow 类来构建每个块具有不同并行度的处理管道。所有课程都支持开箱即用的取消。那你想做什么? @AnuragPatil 实际问题是什么?整个问题令人困惑,代码根本没有帮助,因为它试图重新发明任务 - 它试图在线程上创建包装器,即使 tasks 以这种方式工作。管理线程是TaskScheduler的工作,不需要手动处理线程 【参考方案1】:

简单的方法是使用 CancelAfter 方法在 200 毫秒后取消的 cancelToken。另一个选项是运行await Task.Delay(200)。我还建议使用Parallel.For,因为这将是更紧凑的代码:

    public static void CancelParallel()
    
        var cts = new CancellationTokenSource();
        cts.CancelAfter(TimeSpan.FromSeconds(200));
        var po = new ParallelOptions()
        
            CancellationToken = cts.Token
        ;
        void ParallelMethod(int iterationIndex, ParallelLoopState state)
        
            // Do whatever
            // if the iteration is long running you can check if it should exit
            if (state.ShouldExitCurrentIteration)
            
                return;
            
        

        var loopResult = Parallel.For(0, 7, po, ParallelMethod);
        if (loopResult.IsCompleted)
        
            // Do something if all threads completed
        
        else
        
            // Do something else
        
    

如果您只关心第一次迭代的成功,请在成功时设置一些标志并检查它而不是 loopResult。

请注意,Parallel.For 将在取消令牌时停止运行新的迭代,但会让现有的迭代运行到完成,因此您可能希望也可能不希望检查迭代内的令牌

【讨论】:

检查并行循环内的po.CancellationToken 可能是多余的。 Parallel.For 实现应该监视令牌,并在令牌发出信号时传播OperationCanceledException。如果您想提前退出以便单个长时间运行的操作不会不必要地延迟 Parallel.For 的完成,则要查询的属性可能是 ParallelLoopState.ShouldExitCurrentIteration 目的是演示如何取消长时间运行的循环迭代。检查po.CancellationToken是从how to cancel a parallel loop复制的 我不喜欢文档中的示例。请查看this小提琴。取消令牌时,Parallel.For 会自动取消。

以上是关于真正的并行处理[关闭]的主要内容,如果未能解决你的问题,请参考以下文章

并行处理许多 API 请求与异步/等待 [关闭]

FPGA 的并行度如何?

PHP 中的并行处理 - 你是怎么做的?

还在MapReduce?真正的并行计算引擎——Apache Impala你需要了解这些

异步/IO 和并行

.NET 是不是使用并发性或并行性或两者兼而有之? [关闭]