Actor使用模式
现在我们已经了解了可以创建的actor系统的不同类型,那么我们在编写基于actor的应用程序时,可以采用什么样的使用模式,以便避免出现常见错误呢? 下面就让我们看看其中使用模式。
Extra模式
异步编程中最困难的任务之一就是尝试捕获上下文,以便在任务开始时可以准确地表示任务完成时的世界状态。 但是,创建Akka actors的匿名实例是一种非常简单且轻量级的解决方案,用于在处理消息时捕获上下文以在任务成功完成时使用。 它们就像电影演员中的演员一样 - 帮助为正在他们身边工作的主要演员提供真实情境。
问题
一个很好的例子是一个actor顺序处理邮箱中的消息,但是用Future在额外的线程中处理基于这些消息的任务。 这是设计actor的好方法,因为他们不会阻塞响应,可以同时处理更多的消息并提高应用程序的性能。 但是,actor的状态可能随每条消息而改变。
我们来定义这个例子的样板。 这些类将在我们开发过程的每个迭代中重用。 请注意,这些代码都可以在我的GitHub仓库中找到,你可以克隆并测试它们。首先,我们有一条消息告诉actor检索特定ID客户的账户余额:
case class GetCustomerAccountBalances(id: Long)
接下来,我们有返回请求的帐户信息的数据传输对象。 由于客户可能没有任何类型的账户,也可能有多于一种账户类型,所以我们在这种情况下返回Option [List [(Long,BigDecimal)]],其中Long代表 一个账户标识符,BigDecimal代表一个余额:
case class AccountBalances( val checking: Option[List[(Long, BigDecimal)]], val savings: Option[List[(Long, BigDecimal)]], val moneyMarket: Option[List[(Long, BigDecimal)]]) case class CheckingAccountBalances( val balances: Option[List[(Long, BigDecimal)]]) case class SavingsAccountBalances( val balances: Option[List[(Long, BigDecimal)]]) case class MoneyMarketAccountBalances( val balances: Option[List[(Long, BigDecimal)]])
我在本书的前言中承诺,将展示如何通过领域驱动设计将其与Eric Evans的概念联系起来。 看看我为了完成这项工作创建的类。我们可以将整个AccountService视为一个上下文绑定,其中CheckingAccount或SavingsAccount是一个实体。其中表示余额的数字是一个值。CheckingBalances,Saving Balances和mmBalances字段是聚合,而返回类型的AccountBalances是聚合根。 最后,Vaughn Vernon在他出色的“Implementing DomainDriven Design”中指出Akka是事件驱动的上下文绑定的可能实现。 使用Akka实现命令查询职责分离(按照Greg Young的规范)和事件源(使用开源事件源库)也很容易。
最后,我们有代表服务接口的代理ttrait。 就像使用向服务暴露接口而不是类的实现的Java最佳实践一样,我们将在这里遵循这个约定,并定义服务接口,然后可以在我们的测试中将其删除:
trait SavingsAccountsProxy extends Actor trait CheckingAccountsProxy extends Actor trait MoneyMarketAccountsProxy extends Actor
我们举一个actor的例子,这个actor可以作为一个代理从多个数据源获取一个金融服务的客户账户信息。此外,我们假设每个用于储蓄、支票和货币市场账户余额的子系统代理将可选地返回该客户的账户及其余额的清单,并且我们将这些作为依赖关系注入到检索器类中。我们来编写一些基本的Akka actor代码来执行这个任务:
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask import akka.util.Timeout class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor { implicit val timeout: Timeout = 100 milliseconds implicit val ec: ExecutionContext = context.dispatcher def receive = { case GetCustomerAccountBalances(id) => val futSavings = savingsAccounts ? GetCustomerAccountBalances(id) val futChecking = checkingAccounts ? GetCustomerAccountBalances(id) val futMM = moneyMarketAccounts ? GetCustomerAccountBalances(id) val futBalances = for { savings <- futSavings.mapTo[Option[List[(Long, BigDecimal)]]] checking <- futChecking.mapTo[Option[List[(Long, BigDecimal)]]] mm <- futMM.mapTo[Option[List[(Long, BigDecimal)]]] } yield AccountBalances(savings, checking, mm) futBalances map (sender ! _) } }
这段代码非常简洁。AccountBalanceRetriever actor收到一条获取客户的账户余额消息,然后同时触发三个future。 第一个将获得客户的储蓄账户余额,第二个将获得支票账户余额,第三个账户将获得货币市场余额。 并行执行这些任务可以避免按顺序执行检索的昂贵成本。 此外,请注意,虽然future会通过账户ID返回某些账户余额的期权,但如果它们返回None,这for语句并不会短路 - 如果futSaving返回None,for语句继续执行。
然而,有几件事情并不理想。 首先,它使用future向其他actor请求应答,这会为每个在幕后发送的消息创建一个新的PromiseActorRef。 这有点浪费资源。 最好是让我们的AccountBalanceRetriever actor以一种“fire and forget”的方式发送消息,并将结果异步收集到一个actor中。
此外,在这个代码中有一个明显的竞争条件 - 你能找到它吗? 我们在映射操作中引用来自futBalances的结果中的“sender”,这可能与未来完成时的ActorRef不同,因为AccountBalanceRetriever ActorRef现在可能正在处理另一个来自不同发件人的邮件!
避免Ask
让我们首先关注消除在actor中要求回复的需求。 我们可以通过!发送消息,并把应答收集到一个账号的余额包含可选值的清单中。但是我们怎么去做呢?
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor._ class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor { val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None var originalSender: Option[ActorRef] = None def receive = { case GetCustomerAccountBalances(id) => originalSender = Some(sender) savingsAccounts ! GetCustomerAccountBalances(id) checkingAccounts ! GetCustomerAccountBalances(id) moneyMarketAccounts ! GetCustomerAccountBalances(id) case AccountBalances(cBalances, sBalances, mmBalances) => (checkingBalances, savingsBalances, mmBalances) match { case (Some(c), Some(s), Some(m)) => originalSender.get ! AccountBalances(checkingBalances, savingsBalances, mmBalances) case _ => } } }
这会好一点,但仍然有很多不足之处。 首先,我们创建了在实例级收到的余额集合,这意味着我们无法把响应聚合区分为单个请求以获取帐户余额。 更糟糕的是,我们无法将超时的请求返回原始请求者。 最后,虽然我们已经将原始发件人捕获为可能有值的实例变量(因为在AccountBalanceRetriever启动时没有originalSender),但当我们想要发回数据时,仍无法确定originalSender是否就是我们想要的值 !
捕获上下文
问题在于我们试图从多个来源中检索数据的脱机操作的结果,并将其返回给首先向我们发送消息的任何人。 然而,当这些future完成时,actor可能已经开始处理其邮箱中的其他消息了,此时AccountBalanceRetriever actor中代表“sender”的状态可能是完全不同的actor实例。 那么我们如何解决这个问题呢?
诀窍是为正在处理的每个GetCustomerAccountBalan ces消息创建一个匿名内部actor。 通过这样做,您可以捕捉到future填充时需要的状态。 让我们看看怎么做:
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor._ class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor { val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None def receive = { case GetCustomerAccountBalances(id) => { context.actorOf(Props(new Actor() { var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None val originalSender = sender def receive = { case CheckingAccountBalances(balances) => checkingBalances = balances isDone case SavingsAccountBalances(balances) => savingsBalances = balances isDone case MoneyMarketAccountBalances(balances) => mmBalances = balances isDone } def isDone = (checkingBalances, savingsBalances, mmBalances) match { case (Some(c), Some(s), Some(m)) => originalSender ! AccountBalances(checkingBalances, savingsBalances, mmBalances) context.stop(self) case _ => } savingsAccounts ! GetCustomerAccountBalances(id) checkingAccounts ! GetCustomerAccountBalances(id) moneyMarketAccounts ! GetCustomerAccountBalances(id) })) } } }
这样就好多了。 我们已经捕获了每个接收的状态,并且只有当三个值都具有值时才将其发回给originalSender。但这里还有两个问题。首先,我们没有定义在超时的时候,如何将原始请求的响应返回给请求他们的人。 其次,我们的originalSender仍然会得到一个错误的值 - “sender”实际上是匿名内部actor的sender值,而不是发送原始GetCustomerAccountBalan ces消息的sender值!
发送超时消息
我们可以发送一条超时消息来处理可能超时的请求,通过允许另一个任务争用超时完成操作的权利。 这是一种非常干净的方式,同时仍然对请求实施超时语义。 如果在超时消息之前,所有三种帐户类型的数据均已在邮箱中排队,则AccountBalan ces类型的正确响应会发送回原始发件人。 但是,如果来自计划任务的超时消息在这三个响应中的任何一个响应之前发生,则超时消息会返回给客户端。
请注意,我仅在特定帐户类型代理没有返回任何数据时才使用“None”来表示。 在找到客户但未找到任何数据的情况下,我会收到Some(List())的响应,这意味着在该账户类型中找不到该客户的任何数据。这样,我可以在语义上区分是否收到回复以及何时未找到数据。
import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import org.jamieallen.effectiveakka.common._ import akka.actor.{ Actor, ActorRef, Props, ActorLogging } import akka.event.LoggingReceive object AccountBalanceRetrieverFinal { case object AccountRetrievalTimeout } class AccountBalanceRetrieverFinal(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor with ActorLogging { import AccountBalanceRetrieverFinal._ def receive = { case GetCustomerAccountBalances(id) => { log.debug(s"Received GetCustomerAccountBalances for ID: $id from $sender") val originalSender = sender context.actorOf(Props(new Actor() { var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None def receive = LoggingReceive { case CheckingAccountBalances(balances) => log.debug(s"Received checking account balances: $balances") checkingBalances = balances collectBalances case SavingsAccountBalances(balances) => log.debug(s"Received savings account balances: $balances") savingsBalances = balances collectBalances case MoneyMarketAccountBalances(balances) => log.debug(s"Received money market account balances: $balances") mmBalances = balances collectBalances case AccountRetrievalTimeout => sendResponseAndShutdown(AccountRetrievalTimeout) } def collectBalances = (checkingBalances, savingsBalances, mmBalances) match { case (Some(c), Some(s), Some(m)) => log.debug(s"Values received for all three account types") timeoutMessager.cancel sendResponseAndShutdown(AccountBalances(checkingBalances, savingsBalances, mmBalances)) case _ => } def sendResponseAndShutdown(response: Any) = { originalSender ! response log.debug("Stopping context capturing actor") context.stop(self) } savingsAccounts ! GetCustomerAccountBalances(id) checkingAccounts ! GetCustomerAccountBalances(id) moneyMarketAccounts ! GetCustomerAccountBalances(id) import context.dispatcher val timeoutMessager = context.system.scheduler. scheduleOnce(250 milliseconds) { self ! AccountRetrievalTimeout } })) } } }
现在我们可以收集我们的结果并检查是否收到预期值,并将它们放入AccountBalances结果中以返回给调用方,同时也可以取消预定任务,以免浪费资源。最后,我们必须记得去停止匿名内部actor,以便在收到每个GetCustomerAc countBalances消息时不会泄露内存,而无论我们是否收到了三个响应或超时消息!
那么我们为什么必须将AccountRetrievalTimeout消息发送给我们自己,放入Extra actor队列中,而不是直接将它发送回我们的scheduleOnce lambda中的原始sender? 计划任务将在另一个线程上运行! 如果我们执行相关工作来清理该线程上的actor,我们就把并发性引入到了actor中。在这个例子中,虽然我们只是告诉actor发送消息后自行停止,但如果您不发送消息给自己,那么会很容易的陷入关闭某个状态并操纵它的陷阱。 还有其他一些调度接口会使某些操作更加明显,例如此处显示的方法调用样式:
val timeoutMessager = context.system.scheduler.scheduleOnce(250 milliseconds, self, AccountRetrievalTimeout)
你必须对此保持警惕。 有时候,可能很容易陷入将并发性引入actor的陷阱,而此时并不应该存在任何并发性。如果你看到自己在actor上使用花括号,那就想想里面发生的事情以及你可能需要关闭的资源。
为什么不用promise?
在本示例的早期版本中,我尝试使用promise来执行此项工作,其中AccountBalances类型的成功结果和超时失败结构都放入promise的future内部。然而,这是非常复杂的,因为我们可以在消息排队时执行相同的基本任务时允许Extra actor队列中的排序。 但是,你也不能从promise返回future的值 - 他们不能被发送给actor,不管actor是不是远程的。 由于位置透明度的优点,这是actor不应该关注的实现细节。
future永远不要再actor之间传递,因为你不能序列化一个线程
如何测试逻辑
现在我们有一些可以工作的代码,需要编写一些测试来证明它的正确性。如果你是TDD的追随者,你可能会对我没有一开始就写测试感到羞愧。我并不坚守什么时候写测试,我只关心测试写入。
我们要做的第一件事是定义在测试中使用并作为依赖关系注入到检索者actor中的测试存根。这些存根可以是非常简单的actor - 当通过特定的客户ID询问其类型的帐户信息时,每个没有故障的测试案例存根将按帐户ID返回可选的余额列表。测试中使用的每个客户的数据都需要放入要查找的map中,如果没有返回数据,则返回Some(List())的值以符合我们的API
import akka.actor.{ Actor, ActorLogging } import akka.event.LoggingReceive class CheckingAccountsProxyStub extends CheckingAccountsProxy with ActorLogging { val accountData = Map[Long, List[(Long, BigDecimal)]]( 1L -> List((3, 15000)), 2L -> List((6, 640000), (7, 1125000), (8, 40000))) def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => log.debug(s"Received GetCustomerAccountBalances for ID: $id") accountData.get(id) match { case Some(data) => sender ! CheckingAccountBalances(Some(data)) case None => sender ! CheckingAccountBalances(Some(List())) } } } class SavingsAccountsProxyStub extends SavingsAccountsProxy with ActorLogging { val accountData = Map[Long, List[(Long, BigDecimal)]]( 1L -> (List((1, 150000), (2, 29000))), 2L -> (List((5, 80000)))) def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => log.debug(s"Received GetCustomerAccountBalances for ID: $id") accountData.get(id) match { case Some(data) => sender ! SavingsAccountBalances(Some(data)) case None => sender ! SavingsAccountBalances(Some(List())) } } } class MoneyMarketAccountsProxyStub extends MoneyMarketAccountsProxy with ActorLogging { val accountData = Map[Long, List[(Long, BigDecimal)]]( 2L -> List((9, 640000), (10, 1125000), (11, 40000))) def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => The Extra Pattern | 17 log.debug(s"Received GetCustomerAccountBalances for ID: $id") accountData.get(id) match { case Some(data) => sender ! MoneyMarketAccountBalances(Some(data)) case None => sender ! MoneyMarketAccountBalances(Some(List())) } } }
在失败情况下(比如超时),存根将模拟长时间运行的阻塞的数据库调用,该调用不会对调用参与者发送响应,从而无法及时完成:
class TimingOutSavingsAccountProxyStub extends SavingsAccountsProxy with ActorLogging { def receive = LoggingReceive { case GetCustomerAccountBalances(id: Long) => log.debug(s"Forcing timeout by not responding!") } }
以下示例显示如何编写测试用例以成功返回AccountBalances。由于本示例使用存根代理来接收帐户信息,因此注入仅测试存根代理会导致发生超时功能,这是微不足道的。
我们还希望确保每个处理的消息的上下文的完整性由我们的检索器维护。 为此,我们依次发送来自不同TestProbe实例的多个消息,并验证不同的值是否正确地返回。
请注意我如何使用within块来验证预期响应的时间。 这是验证您的测试正在执行以满足系统的非功能需求的好方法。使用within块指定执行的最大时间,正如我们在失败情况中看到的那样,我们没有太早或迟到地收到响应。
最后,我们通过在我们的检索器中注入一个超时存根来测试超时条件,并确保超时响应是我们的测试收到的响应:
import akka.testkit.{ TestKit, TestProbe, ImplicitSender } import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } import org.scalatest.WordSpecLike import org.scalatest.matchers.MustMatchers import scala.concurrent.duration._ import org.jamieallen.effectiveakka.common._ import org.jamieallen.effectiveakka.pattern.extra.AccountBalanceRetrieverFinal._ class ExtraFinalSpec extends TestKit(ActorSystem("ExtraTestAS")) with ImplicitSender with WordSpecLike with MustMatchers { "An AccountBalanceRetriever" should { "return a list of account balances" in { 18 | Chapter 2: Patterns of Actor Usage val probe2 = TestProbe() val probe1 = TestProbe() val savingsAccountsProxy = system.actorOf(Props[SavingsAccountsProxyStub], "extra-success-savings") val checkingAccountsProxy = system.actorOf(Props[CheckingAccountsProxyStub], "extra-success-checkings") val moneyMarketAccountsProxy = system.actorOf( Props[MoneyMarketAccountsProxyStub], "extra-success-money-markets") val accountBalanceRetriever = system.actorOf( Props(new AccountBalanceRetrieverFinal(savingsAccountsProxy, checkingAccountsProxy, moneyMarketAccountsProxy)), "extra-retriever") within(300 milliseconds) { probe1.send(accountBalanceRetriever, GetCustomerAccountBalances(1L)) val result = probe1.expectMsgType[AccountBalances] result must equal(AccountBalances( Some(List((3, 15000))), Some(List((1, 150000), (2, 29000))), Some(List()))) } within(300 milliseconds) { probe2.send(accountBalanceRetriever, GetCustomerAccountBalances(2L)) val result = probe2.expectMsgType[AccountBalances] result must equal(AccountBalances( Some(List((6, 640000), (7, 1125000), (8, 40000))), Some(List((5, 80000))), Some(List((9, 640000), (10, 1125000), (11, 40000))))) } } "return a TimeoutException when timeout is exceeded" in { val savingsAccountsProxy = system.actorOf( Props[TimingOutSavingsAccountProxyStub], "extra-timing-out-savings") val checkingAccountsProxy = system.actorOf( Props[CheckingAccountsProxyStub], "extra-timing-out-checkings") val moneyMarketAccountsProxy = system.actorOf( Props[MoneyMarketAccountsProxyStub], "extra-timing-out-money-markets") val accountBalanceRetriever = system.actorOf( Props(new AccountBalanceRetrieverFinal(savingsAccountsProxy, checkingAccountsProxy, moneyMarketAccountsProxy)), "extra-timing-out-retriever") val probe = TestProbe() within(250 milliseconds, 500 milliseconds) { probe.send(accountBalanceRetriever, GetCustomerAccountBalances(1L)) probe.expectMsg(AccountRetrievalTimeout) } The Extra Pattern | 19 } } }
现在我们的测试检查了成功案例,失败导致预期的行为。因为AccountRetrievalTimeout是一个case对象,它是一个“term”,而不是“type”,因此我可以使用expectMsg()方法而不是expectMsgType []
即使使用强大的工具,异步编程也并非易事。 我们总是必须考虑到我们需要的状态以及我们在需要的时候获取它的环境