Dart:如何在异步函数中管理并发
Posted
技术标签:
【中文标题】Dart:如何在异步函数中管理并发【英文标题】:Dart: how to manage concurrency in async function 【发布时间】:2017-06-23 14:06:08 【问题描述】:我真的很喜欢 Dart 中的异步/等待模式。 它允许我编写可读的方法。
但是,有几件事是有问题的,特别是一件,我根本不知道要管理什么。
问题在于,在一个方法中使用 async 和多个 await,我们在方法中引入了并发。 例如如果我有一个方法:
Future<int> foo(int value) async
await foo2();
await foo3();
await foo4();
int ret = foo5(value);
return ret;
嗯,这是一个非常简单的例子。 这里的问题是,对于每个等待,该方法都被放入事件循环中。 没关系,当您理解它时,但这不会阻止您的应用程序在重新调整值之前再次调用该方法。
考虑该方法是否管理类实例共有的数据,而不是方法本身。
所以,我尝试了以下解决方案:
bool isWorking = false;
Future<int> foo(int value) async
if (isWorking) return foo(value);
isWorking = true;
await foo2();
await foo3();
await foo4();
int ret = foo5(value);
isWorking = False;
return ret;
据我了解,调用future方法会立即将其放入事件循环中,所以我认为该方法的并发调用的执行延迟到第一个结束。 但不是这样,程序进入死循环。
谁能给我一个解释和解决这个问题的方法?
编辑: 一般来说,我认为像在其他语言中一样,拥有一个 synchronized 关键字可能会很有趣,其含义是如果第二次调用该方法将等到第一次结束。 比如:
Future<int> foo(int value) async synchronized
编辑 2:
我真的很兴奋,因为我想我找到了解决这个问题的方法,这个问题我解决了很长时间。感谢 Argenti,特别是 Alexandre,他们给了我解决方案。我只是简单地重组了解决方案以便于重用(至少对我而言),并在此处发布了我创建的类以及如何为可能需要它的人使用它的示例(尝试自担风险;-))。 我使用了一个 mixin,因为我觉得它很实用,但是如果你愿意,你可以单独使用 Locker 类。
myClass extends Object with LockManager
Locker locker = LockManager.getLocker();
Future<int> foo(int value) async
_recall()
return foo(value);
if (locker.locked)
return await locker.waitLock();
locker.setFunction(_recall);
locker.lock();
await foo2();
await foo3();
await foo4();
int ret = foo5(value);
locker.unlock();
return ret;
班级是:
import 'dart:async';
class LockManager
static Locker getLocker() => new Locker();
class Locker
Future<Null> _isWorking = null;
Completer<Null> completer;
Function _function;
bool get locked => _isWorking != null;
lock()
completer = new Completer();
_isWorking = completer.future;
unlock()
completer.complete();
_isWorking = null;
waitLock() async
await _isWorking;
return _function();
setFunction(Function fun)
if (_function == null) _function = fun;
我以这种方式构造了代码,以便您可以在类中的多个方法中轻松使用它。在这种情况下,每个方法都需要一个 Locker 实例。 希望对你有用。
【问题讨论】:
其实我也不知道。 【参考方案1】:您可以使用Future
和Completer 而不是布尔值来实现您想要的:
Future<Null> isWorking = null;
Future<int> foo(int value) async
if (isWorking != null)
await isWorking; // wait for future complete
return foo(value);
// lock
var completer = new Completer<Null>();
isWorking = completer.future;
await foo2();
await foo3();
await foo4();
int ret = foo5(value);
// unlock
completer.complete();
isWorking = null;
return ret;
方法第一次调用isWorking
是null
,不进入if
部分并创建isWorking
作为将在方法结束时完成的Future。如果在第一次调用完成 Future isWorking
之前对 foo
进行了其他调用,则此调用进入 if
部分并等待 Future isWorking
完成。这对于可以在第一次调用完成之前完成的所有调用都是相同的。一旦第一个呼叫完成(并且isWorking
设置为null
),等待的呼叫就会收到通知,他们将再次呼叫foo
。其中一个将输入foo
作为第一个呼叫,并且将完成相同的工作流程。
请参阅https://dartpad.dartlang.org/dceafcb4e6349acf770b67c0e816e9a7 以更好地了解工作流程。
【讨论】:
请考虑,在我的真实环境中,我可以有不同的返回值,具体取决于我传递给方法的参数。但是,总的来说,我不明白你的代码是如何工作的,更详细的是 is Working 定义和 await isWorking 语句。能否请您解释一下? sn-p 不会为每次调用返回相同的值。我不明白你的第一句话。 非常有趣。我见过一些包以更结构化的方式完成相同的工作,但您的解决方案简单明了。 如果我们同时(由不同的调用者)多次调用 foo,他们无法检查 isWorking 是否为空并获得锁(竞争条件)? Dart 是单线程的这种情况是不可能发生的。【参考方案2】:答案很好,这只是防止异步操作交错的“互斥锁”的另一种实现。
class AsyncMutex
Future _next = new Future.value(null);
/// Request [operation] to be run exclusively.
///
/// Waits for all previously requested operations to complete,
/// then runs the operation and completes the returned future with the
/// result.
Future<T> run<T>(Future<T> operation())
var completer = new Completer<T>();
_next.whenComplete(()
completer.complete(new Future<T>.sync(operation));
);
return _next = completer.future;
它没有很多功能,但很简短,希望可以理解。
【讨论】:
如果操作抛出 expetion 怎么办 然后返回的future以该异常结束。【参考方案3】:现在是2021年,我们可以使用synchronized 3.0.0
var lock = new Lock();
Future<int> foo(int value) async
int ret;
await lock.synchronized(() async
await foo2();
await foo3();
await foo4();
ret = foo5(value);
return ret;
【讨论】:
你能用应用于问题的插件添加源 sn-p 吗?【参考方案4】:我猜真正需要的是实现并发管理原语(如锁、互斥体和信号量)的 Dart 库。
我最近使用Pool 包来有效地实现互斥锁以防止对资源的“并发”访问。 (这是“扔掉”代码,所以请不要将其视为高质量的解决方案。)
稍微简化示例:
final Pool pool = new Pool(1, timeout: new Duration(...));
Future<Null> foo(thing, ...) async
PoolResource rp = await pool.request();
await foo1();
await foo2();
...
rp.release();
在调用 foo() 中的异步函数之前从池中请求资源可确保当同时调用多个 foo() 时,对 foo1() 和 foo2() 的调用不会被错误地“交错”。
编辑:
似乎有几个包提供互斥锁:https://www.google.com/search?q=dart+pub+mutex。
【讨论】:
嗯,这就是我想要的,我猜。我会试试你建议的包。只有一个问题,第二次调用会发生什么,该方法被冻结直到第一次结束? 是的,它被阻塞等待pool.request()
返回的未来完成,直到rp.release()
在之前对foo()
的调用中调用(或者如果没有先前的致电foo()
)。
我很惊讶异步 SDK 库(或它的 Pub 对应库)不包含某种互斥类。
我还使用了池为 1 的 pool 包。我遇到的问题是它不能重入,因此可能会导致一些死锁。这就是我编写同步包的原因。它使用上面提出的 Alexandre Ardhuin 的 Completer 和使用简单 api(灵感来自 java synchonized api)且无依赖关系的 Zones 可重入【参考方案5】:
异步和并发是两个独立的主题。上面的代码没有什么并发的,都是串行执行的。当 isWorking 为真时,您拥有的代码将继续插入更多 foo(value)
事件 - 它永远不会完成。
您正在寻找的工具是Completer<T>
。有问题的特定代码 sn-p 很难辨别,所以我再举一个例子。假设您有共享数据库连接的代码。打开数据库连接是一个异步操作。当一种方法要求建立数据库连接时,它会等待打开完成。在等待期间,另一种方法要求建立数据库连接。期望的结果是只打开一个数据库连接,并将一个数据库连接返回给两个调用者:
Connection connection;
Completer<Connection> connectionCompleter;
bool get isConnecting => connectionCompleter != null;
Future<Connection> getDatabaseConnection()
if (isConnecting)
return connectionCompleter.future;
connectionCompleter = new Completer<Connection();
connect().then((c)
connection = c;
connectionCompleter.complete(c);
connectionCompleter = null;
);
return connectionCompleter.future;
第一次调用getDatabaseConnection
时,会调用connect
函数,并将完成者的未来返回给调用者。假设在connect
完成之前再次调用getDatabaseConnection
,则返回相同的完成者的未来,但不会再次调用connect
。
当connect
完成时,完成者的事件被添加到事件队列中。这会触发从getDatabaseConnection
返回的两个期货按顺序完成,并且connection
将有效。
注意getDatabaseConnection
甚至不是一个异步方法,它只是返回一个Future
。它可能是异步的,但它只会浪费 CPU 周期并使您的调用堆栈变得丑陋。
【讨论】:
谢谢乔,但你的代码对我不起作用,因为我在实际环境中使用的函数根据不同的参数返回不同的值。在您的情况下,函数的后续调用将根据第一次调用的选择返回数据。我需要一些东西来暂停正在运行的通话。以上是关于Dart:如何在异步函数中管理并发的主要内容,如果未能解决你的问题,请参考以下文章