使用 polly 并行化任务

Posted

技术标签:

【中文标题】使用 polly 并行化任务【英文标题】:Parallelize tasks using polly 【发布时间】:2016-10-24 15:49:24 【问题描述】:

假设我有一个 List<Foo> 类型的对象 myObj 的列表。

我有一个波利政策:

var policy = Policy.Handle<Exception>().RetryForever();

我想在并行中运行方法,但在它们失败时继续重试。

for (int i = 0; i < myObjs.Count; i++)

  var obj = myObjs[i];
  policy.Execute(() => Task.Factory.StartNew(() => obj.Do(), TaskCreationOptions.LongRunning));

这会被并行调用并重试每个 obj 吗?因此,如果 myObjs[5].do() 失败,是否只会重试,而其他对象只会执行一次?

另外,我是否应该使用接受 Func 的 ExecuteAsync() 方法,而不是示例中所示的 Execute(Action) 方法? Do() 只是一个同步方法,在单独的线程中启动。 实际代码如下所示,其中 each() 只是一个 foreach wrapper()

_consumers.ForEach(c => policy.Execute(() => Task.Factory.StartNew(() => c.Consume(startFromBeg), TaskCreationOptions.LongRunning)));

编辑:

我试过代码:

class Foo    

        private int _i;
        public Foo(int i)
        
            _i = i;
        
        public void Do()
        
            //var rnd = new Random();
            if (_i==2)
            
                Console.WriteLine("err"+_i);

                throw new Exception();
            
            Console.WriteLine(_i);
        

var policy = Policy.Handle<Exception>().Retry(3);
var foos=Enumerable.Range(0, 5).Select(x => new Foo(x)).ToList();
foos.ForEach(c => policy.Execute(() => Task.Factory.StartNew(() => c.Do(), TaskCreationOptions.LongRunning)));

但我得到了结果:

0 1 错误2 3 4 5

我以为它会再重试 2 次,但没有。知道为什么吗?

【问题讨论】:

【参考方案1】:

任何拥有这些任务的人都必须以某种方式等待它们。否则,exceptions will be ignored 和代码将在任务实际完成之前结束。所以是的,您可能应该改用policy.ExecuteAsync()。它看起来像这样:

var tasks = myObjs
    .Select(obj => Task.Factory.StartNew(() => obj.Do(), TaskCreationOptions.LongRunning))
    .ToList();

// sometime later
await Task.WhenAll(tasks);

【讨论】:

是的,现在可以了。我用 Task.WhenAll(tasks).GetAwaiter().GetResult();在我的同步方法中并使用策略的 RetryAsync() 版本。

以上是关于使用 polly 并行化任务的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中使用 Ray 并行化任务,得到“Aborted (core dumped)”

HPX 是不是提供具有粒度控制的基于任务的并行化迭代功能?

在 Python 中使用 asyncio 并行化 Web 任务

任务是使用 p 线程并行化矩阵乘法并使用英特尔 ISPC 编译器进行矢量化

读Java8函数式编程笔记05_数据并行化

R中的并行化“查找”循环