Scala框架Akka学习

Posted Maverick_曲流觞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Scala框架Akka学习相关的知识,希望对你有一定的参考价值。

Scala框架Akka

文章目录

Akka概述

Akka是一个用于构建高并发、分布式和弹性的消息驱动应用程序的工具包,支持Java和Scala语言。Akka是基于JVM上的Actor模型实现的,Actor模型是一种并发编程范式,它将系统中的每个实体抽象为一个Actor,Actor之间通过异步消息进行通信和协作。

Akka特点

  • 简化并发和分布式系统的开发,使用Actor和流(Streams)可以让系统更好地利用服务器的资源,以及在多个服务器之间进行负载均衡和自适应路由。
  • 具有弹性设计,基于反应式宣言(The Reactive Manifesto)的原则,Akka允许你编写能够自我修复和在故障面前保持响应的系统。
  • 高性能,单机可达2亿条消息/秒,内存占用小,每GB堆内存可容纳约250万个Actor。
  • 弹性和去中心化,分布式系统没有单点故障,使用集群分片(Cluster Sharding)实现事件溯源(Event Sourcing)和CQRS,使用分布式数据(Distributed Data)实现最终一致性(Eventual Consistency)和CRDTs。
  • 反应式流数据处理,使用背压(Backpressure)机制实现异步非阻塞的流处理,提供完全异步和流式的HTTP服务器和客户端,适合构建微服务,使用Alpakka实现与各种数据源和接收器的流式集成。

Akka与Scala.actors.Actor的关系

Scala.actors.Actor是Scala语言中提供的一个Actor模型的实现,它是基于Java线程模型的,并不是一个真正的轻量级线程。Scala.actors.Actor已经在Scala 2.10版本中被废弃,并被Akka Actor取代。Akka Actor提供了更高效、更健壮、更灵活、更丰富的功能,并且与Scala语言完美集成。

Akka模型介绍

Akka的模型是基于Actor模型的,Actor模型是一种并发编程范式,它将系统中的每个实体抽象为一个Actor,Actor之间通过异步消息进行通信和协作。

Actor模型的优点

  • 避免了显式的锁和线程管理,简化了并发和分布式系统的开发。
  • 实现了封装和隔离,每个Actor都有自己的状态和行为,不会被其他Actor干扰或修改。
  • 保证了顺序性和一致性,每个Actor都按照消息到达的顺序处理消息,不会出现竞态条件或死锁。
  • 支持了弹性和可扩展性,Actor可以动态地创建、销毁、迁移、监督和恢复,可以根据负载和故障进行自适应调整。

Akka模型的核心概念

Akka的模型是在JVM上实现的Actor模型,它提供了一套丰富的API和工具来构建高并发、分布式和弹性的消息驱动应用程序。

Akka的模型包括以下几个核心概念:

  • Actor:一个具有唯一标识(ActorRef)和邮箱(Mailbox)的实体,可以接收和发送消息,以及创建和监督其他Actor。
  • Behavior:一个定义了Actor如何响应不同类型消息的函数,可以在运行时改变。
  • Context:一个提供了Actor运行时信息和操作的对象,例如获取自身或父级或子级的ActorRef,设置定时器,创建适配器等。
  • ActorSystem:一个管理和运行一组Actor的容器,提供了配置、调度、日志、扩展等功能。
  • ActorPath:一个表示Actor在ActorSystem中位置的字符串,可以用来查找或寻址Actor。
  • ActorSelection:一个表示一组符合某种模式的ActorPath的对象,可以用来发送消息给这些Actor。

如何创建Actor

Actor是Akka的基本构建单元,它是一个封装了状态和行为的并发实体,可以与其他Actor通过异步消息进行通信。

添加依赖

<!-- 添加 akka 的 actor 依赖 -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.4.17</version>
</dependency>
<!-- 多进程之间的 Actor 通信 -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-remote_2.12</artifactId>
    <version>2.4.17</version>
</dependency>

在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法

  • preStart (): 在Actor启动后立即调用。如果没有重写postRestart (),则在Actor第一次创建时只执行一次。你可以在这个方法中做一些初始化的工作,比如连接数据库,注册监听器,创建子Actor等。
  • preRestart (): 在处理消息时发生异常时调用。适合做清理工作。之后会调用postStop ()。你可以在这个方法中做一些资源释放的工作,比如关闭数据库连接,取消监听器,停止子Actor等。
  • postRestart (): 重启后的新Actor调用此方法。默认情况下,它会调用preStart ()。你可以在这个方法中做一些恢复状态的工作,比如重新连接数据库,重新注册监听器,重新创建子Actor等。
  • postStop (): 在停止Actor时调用。适合做资源释放工作。你可以在这个方法中做一些和preRestart ()相反的工作,比如关闭数据库连接,取消监听器,停止子Actor等。

akka的架构原理

mailbox负责存储actor收到的消息,dispatcher负责从mailbox取消息,分配线程给actor执行具体的业务逻辑。

sender引用代表最近收到消息的发送actor,通常用于回消息,比如 sender() !xxxx。

创建Actor的步骤

要创建一个Actor,我们需要遵循以下几个步骤:

  1. 定义一个类或对象,继承自akka.actor.Actor特质,实现receive方法,用于处理收到的消息。
  2. 创建一个akka.actor.ActorSystem实例,用于管理和监控Actor。要创建ActorSystem的原因是,它是Akka框架的基础,它提供了创建和管理Actor的能力,以及一些共享的服务和配置。没有ActorSystem,就无法使用Akka的功能。😊
  3. 使用actorOf方法,传入一个akka.actor.Props实例,用于描述如何创建Actor。
  4. 获取一个akka.actor.ActorRef实例,用于与Actor进行通信。

receive方法

receive方法是Actor类的一个抽象方法,它用于定义Actor如何处理收到的消息。

receive方法具有以下特点:

  • receive方法必须在Actor的子类中被重写,否则会抛出一个异常。
  • receive方法接受一个偏函数作为参数,它可以使用case语句来匹配不同类型的消息,并执行相应的逻辑。
  • receive方法可以使用sender()方法来获取消息的发送者的ActorRef,以便回复或转发消息。
  • receive方法可以使用context()方法来获取当前Actor的上下文信息,以便创建子Actor或访问系统服务。
  • receive方法可以使用self()方法来获取当前Actor的ActorRef,以便发送消息给自己或其他Actor。
  • receive方法可以使用become()或unbecome()方法来改变当前Actor的行为,即切换不同的receive函数。

介绍ActorSystem

ActorSystem是Akka的核心组件,它是一个管理和监控Actor的容器,也是创建或查找Actor的入口。
ActorSystem具有以下特点:

  • ActorSystem是一个分层的结构,它包含一个根节点一个用户节点一个系统节点一个临时节点。用户可以在用户节点下创建自定义的Actor,系统节点下是Akka内部的Actor,临时节点下是用于处理Ask模式的临时Actor。
  • ActorSystem是一个重量级的结构,它会分配多个线程,所以一般一个应用只需要创建一个ActorSystem实例。如果需要创建多个ActorSystem,可以使用不同的配置来避免冲突。
  • ActorSystem是一个配置容器,它可以加载配置文件或者代码中的配置,用于设置Actor的部署、调度、远程通信、日志等方面的参数
  • ActorSystem是一个协作集合,它提供了一些共享的服务,如调度器、日志、扩展等,供Actor使用。
    要创建一个ActorSystem,我们需要使用akka.actor.ActorSystem类的apply方法,传入一个系统名称和可选的配置对象。例如:
// 导入Akka相关的包
import akka.actor.ActorSystem

// 创建一个名为hello-system的ActorSystem实例
val system = ActorSystem("hello-system")

要关闭一个ActorSystem,我们需要使用terminate方法,它会返回一个Future对象,表示关闭的结果。例如:

// 关闭ActorSystem实例
val future = system.terminate()

要获取一个ActorSystem的状态,我们可以使用whenTerminated方法,它会返回一个Future对象,表示系统终止时的状态。例如:

// 获取ActorSystem终止时的状态
val future = system.whenTerminated

actorOf方法

actorOf方法是ActorSystem和ActorContext类的一个成员方法,它用于创建一个新的Actor实例,并返回一个ActorRef实例,用于与Actor进行通信。

actorOf方法的参数

actorOf方法有两个重载版本,它们都接受一个Props实例作为参数,用于描述如何创建Actor。

  • def actorOf(props: Props): ActorRef:这个版本只接受一个Props实例作为参数,它会自动生成一个唯一的名称给新创建的Actor,这个名称类似于base64编码的整数计数器,反转并加上“$”前缀,但是这个名称可能会在未来改变。
  • def actorOf(props: Props, name: String): ActorRef:这个版本接受一个Props实例和一个字符串作为参数,它会使用给定的字符串作为新创建的Actor的名称,这个名称必须不为空,不以“$”开头,并且在同一层级中唯一。如果给定的名称已经被使用,会抛出InvalidActorNameException异常。

props

Props实例是一个对象,它用于描述如何创建Actor。

Props实例具有以下特点:

  • Props实例可以使用Props类的构造方法或伴生对象的工厂方法来创建,传入一个Actor的类或对象,以及可选的构造参数。
  • Props实例可以使用withDispatcherwithMailbox方法来指定Actor的调度器或邮箱。
  • Props实例可以使用withDeploy方法来指定Actor的部署配置。
  • Props实例可以使用withFallback方法来合并另一个Props实例,以便覆盖或补充缺失的属性。
  • Props实例是不可变的和可序列化的,它可以在不同的线程和网络节点之间安全地传递。
  • Props实例是用于创建Actor的模板,它会在每次调用actorOf方法时被复制一份,以保证每个Actor都有自己独立的属性。

actorOf方法的返回值

actorOf方法的返回值是一个ActorRef实例,它是一个轻量级的代理对象,用于与Actor进行通信。

ActorRef是一个抽象类,它是一个轻量级的代理对象,用于与Actor进行通信。

ActorRef具有以下特点:

  • ActorRef可以发送消息给Actor,使用!tell方法发送异步消息,使用?ask方法发送同步消息。
  • ActorRef可以监视Actor的生命周期,使用watchunwatch方法添加或移除监视者,当Actor终止时,会向监视者发送Terminated消息。
  • ActorRef可以获取Actor的路径或地址,使用pathtoString方法获取Actor的路径,使用path.address方法获取Actor的地址。
  • ActorRef不可以终止Actor,需要使用stop方法或发送PoisonPill消息来终止Actor。
  • ActorRef是不可变的和可序列化的,它可以在不同的线程和网络节点之间安全地传递。
  • ActorRef是按照路径和UID来比较相等性的,如果一个Actor终止了,再创建一个同名的Actor,那么它们的ActorRef是不相等的

actorOf方法的示例

下面是一个使用actorOf方法创建Actor的简单示例:

// 导入Akka相关的包
import akka.actor.Actor, ActorSystem, Props

// 定义一个打印消息的Actor类
class Printer extends Actor 
  def receive = 
    case msg => println(msg)
  


// 创建一个ActorSystem实例
val system = ActorSystem("example")

// 使用actorOf方法创建一个Printer实例,不指定名称
val printer1 = system.actorOf(Props[Printer])

// 使用actorOf方法创建一个Printer实例,指定名称为"printer2"
val printer2 = system.actorOf(Props[Printer], "printer2")

// 使用ActorRef实例发送消息给Printer
printer1 ! "Hello from printer1"
printer2 ! "Hello from printer2"

运行上面的代码,我们可以看到控制台输出了两条消息:

Hello from printer1
Hello from printer2

例子

下面我们来看一个简单的例子,创建一个名为Greeter的Actor,它可以接收两种消息:Greet和Done。当收到Greet消息时,它会打印出"Hello, world!";当收到Done消息时,它会停止自己。

// 导入Akka相关的包
import akka.actor.Actor, ActorRef, ActorSystem, Props

// 定义Greeter类,继承自Actor特质
class Greeter extends Actor 
  // 实现receive方法
  def receive: Receive = 
    // 当收到Greet消息时
    case Greet =>
      // 打印出"Hello, world!"
      println("Hello, world!")
      // 将Done消息发送给自己
      self ! Done
    // 当收到Done消息时
    case Done =>
      // 停止自己
      context.stop(self)
  

  // 重写postStop方法
  override def postStop(): Unit = 
    // 打印出"Goodbye, world!"
    println("Goodbye, world!")
  


// 定义两种消息的样例类
case object Greet
case object Done

object PostStopDemo 
  def main(args: Array[String]): Unit = 
    // 创建一个ActorSystem实例,命名为hello-system
    val system: ActorSystem = ActorSystem("hello-system")
    // 使用actorOf方法,传入一个Props实例,用于描述如何创建Greeter
    val greeter: ActorRef = system.actorOf(Props[Greeter], "greeter")
    // 获取一个ActorRef实例,用于与Greeter进行通信
    // 将Greet消息发送给Greeter
    greeter ! Greet
  

运行上面的代码,我们可以看到控制台输出了

Hello, world!
Goodbye, world!

然后Greeter停止了自己。

创建Actor的注意事项

在创建Actor时,有一些注意事项需要遵守:

  • 不要直接使用new关键字来创建Actor实例,而要使用actorOf方法来创建ActorRef实例。这样可以保证Actor的封装性和生命周期管理。
  • 不要在非Actor类中持有或传递Actor实例的引用,而要使用ActorRef或ActorSelection来引用Actor。这样可以保证Actor的位置透明性和分布式部署。
  • 不要在Props中使用可变或外部的状态,而要使用不可变或本地的状态。这样可以保证Props的线程安全性和可序列化性。
  • 不要在receive方法中阻塞或执行耗时的操作,而要使用Future或其他Actor来处理异步或并行的任务。这样可以保证Actor的响应性和吞吐量。

遵循这些注意事项,可以帮助我们更好地使用Akka框架,创建高效和健壮的Actor系统。😊

Actor的path路径

什么是Actor的path路径?

Actor的path路径是一个唯一标识Actor在Actor系统中位置的字符串,它由一个锚点和一个元素序列组成。锚点表示Actor系统的地址和协议,元素序列表示从根守护者到指定Actor的名称层次结构。

例如,一个Actor的path路径可能是这样的:

akka://my-system/user/foo/bar

这个路径表示在名为my-system的Actor系统中,有一个名为foo的Actor,它是user守护者的子级,它又有一个名为bar的子级,这个子级就是我们要标识的Actor。

Actor的path路径有什么作用?

Actor的path路径有以下作用:

  • 它可以用来创建或查找Actor的引用,通过actorOf或actorSelection方法。
  • 它可以用来比较两个Actor是否相等,即如果两个Actor有相同的path路径,那么它们就是相等的。
  • 它可以用来显示或记录Actor的信息,方便调试或监控。

Actor的path路径有哪些类型?

根据Actor所在的位置和层次结构,可以分为以下两种类型:

  • 逻辑path路径:它表示Actor在监督层次结构中的位置,即按照创建和监督关系从根守护者向下遍历得到的路径。这个路径与Actor的功能位置完全匹配,因此只要设置了Actor系统的远程处理配置(以及路径的地址组件),它就完全具有确定性。
  • 物理path路径:它表示Actor在部署层次结构中的位置,即按照实际所在的Actor系统和网络节点从根守护者向下遍历得到的路径。这个路径与Actor的物理位置完全匹配,因此可以用来优化网络通信和路由。

由于远程部署和监督机制,逻辑path路径和物理path路径可能会发生偏离,即一个Actor可能在不同的Actor系统或网络节点上运行,而不是与其父级相同。

例子

// 创建一个ActorSystem实例,用于管理和监控Actor
val system = ActorSystem("my-system")

// 使用actorOf方法创建一个名为foo的Actor实例,并返回它的ActorRef实例
val foo = system.actorOf(Props[MyActor], "foo")

// 使用actorOf方法在foo的子级中创建一个名为bar的Actor实例,并返回它的ActorRef实例
val bar = foo.actorOf(Props[MyActor], "bar")

在这个例子中,我们创建了两个Actor,一个名为foo,它是user守护者的子级,它又有一个名为bar的子级。这两个Actor的path路径分别是:

akka://my-system/user/foo
akka://my-system/user/foo/bar

发送消息/接收消息

概述

Akka有四种核心的Actor消息模式:tell、ask、forward和pipe。你的表格中只列出了tell和ask两种方式,还可以补充forward和pipe的说明,如下:

发送异步消息,没有返回值
!?发送同步消息,等待返回值
!!发送异步消息,返回值是Future[Any]
->转发收到的消息给另一个Actor,保持原始发送者不变
>>将Future的结果发送给sender或另一个Actor
  • Tell:!:向Actor发送一条消息,没有返回值。这是一种fire-and-forget的方式,不需要等待或阻塞发送者。这是最常用和最高效的消息模式。
  • Ask:!?:向Actor发送一条消息,返回一个Future。这是一种request/reply的方式,需要等待或阻塞发送者。这是在Actor系统外部与Actor通信时常用的方式。使用Ask时,需要定义一个超时参数,如果没有在规定时间内收到回复,Future会失败。
  • Forward:->:将收到的消息转发给另一个Actor,保持原始发送者不变。这是一种代理或中转的方式,可以将消息传递给更合适的Actor处理。使用Forward时,响应地址就是原始消息的发送者。
  • Pipe:>>:将Future的结果发送给sender或另一个Actor。这是一种异步回复的方式,可以将Future的结果正确地返回给请求者。使用Pipe时,需要提供一个Actor引用作为目标。

这些消息模式可以根据不同的场景和需求进行选择和组合,实现灵活和高效的消息传递。😊

注意

!!是一个过时的符号,已经被!?替代了。!!的含义是发送异步消息,返回值是Future[Any],但是这样会导致类型不安全,所以不建议使用。!?的含义是发送同步消息,返回值是Future[T],可以指定返回值的类型,更安全和方便。😊

例子

! 发送异步消息,没有返回值

// 导入Akka相关的包
import akka.actor.Actor, ActorSystem, Props

// 定义一个Sender类,继承Actor特质,用于发送消息给Receiver
class Sender extends Actor 
  // 重写receive方法,用于处理收到的消息
  def receive = 
    // 如果收到了"start"消息,就向Receiver发送一个"hello"消息
    case "start" => 
      println("Sender: start")
      context.actorSelection("../receiver") ! "hello"
    // 如果收到了Receiver回复的"hi"消息,就向Receiver发送一个"bye"消息
    /*actorSelection(“…/receiver”)是一个方法,它用于根据给定的路径来查找一个或多个Actor的引用。
在这个例子中,actorSelection(“…/receiver”)表示查找当前Actor的父级的子级中名为"receiver"的Actor的引用。如果存在这样的Actor,就可以向它发送消息,如果不存在,就会收到一个DeadLetter消息。
actorSelection方法返回一个ActorSelection对象,它可以向该路径指向的Actor对象发送消息。3 但是,这种方法不是很可靠,因为路径可能会变化或失效。通常,建议使用ActorRef来直接引用Actor,或者使用Identify和ActorIdentity消息来解析ActorSelection为ActorRef。*/
    case "hi" =>
      println("Sender: hi")
      context.actorSelection("../receiver") ! "bye"
    // 如果收到了Receiver回复的"bye"消息,就停止自己和Receiver
    case "bye" =>
      println("Sender: bye")
      context.stop(self)
      context.stop(context.actorSelection("../receiver").anchor)
  


// 定义一个Receiver类,继承Actor特质,用于接收消息从Sender
class Receiver extends Actor 
  // 重写receive方法,用于处理收到的消息
  def receive = 
    // 如果收到了Sender发送的"hello"消息,就向Sender回复一个"hi"消息
    case "hello" =>
      println("Receiver: hello")
      sender ! "hi"
    // 如果收到了Sender发送的"bye"消息,就向Sender回复一个"bye"消息
    case "bye" =>
      println("Receiver: bye")
      sender ! "bye"
  


// 创建一个ActorSystem实例,用于管理和监控Actor
val system = ActorSystem("example")

// 使用actorOf方法创建一个Sender实例和一个Receiver实例,并返回它们的ActorRef实例
val sender = system.actorOf(Props[Sender], "sender")
val receiver = system.actorOf(Props[Receiver], "receiver")

// 向Sender发送一个"start"消息,开始通信
sender ! "start"

运行上面的代码,我们可以看到控制台输出了以下内容:

Sender: start
Receiver: hello
Sender: hi
Receiver: bye
Sender: bye

!? 发送同步消息,等待返回值

// 导入Akka相关的包
import akka.actor.Actor, ActorSystem, Props
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

// 定义一个Calculator类,继承Actor特质,用于进行简单的加减乘除运算
class Calculator extends Actor 
  // 重写receive方法,用于处理收到的消息
  def receive = 
    // 如果收到了两个整数和一个运算符,就根据运算符进行相应的计算,并将结果回复给发送者
    case (x: Int, "+", y: Int) => sender ! (x + y)
    case (x: Int, "-", y: Int) => sender ! (x - y)
    case (x: Int, "*", y: Int) => sender ! (x * y)
    case (x: Int, "/", y: Int) => sender ! (x / y)
    // 如果收到了其他类型的消息,就回复一个错误信息
    case _ => sender ! "Invalid message"
  


// 创建一个ActorSystem实例,用于管理和监控Actor
val system = ActorSystem("example")

// 使用actorOf方法创建一个Calculator实例,并返回它的ActorRef实例
val calculator = system.actorOf(Props[Calculator], "calculator")

// 使用!?方法向Calculator发送一个同步消息,即(10, "+", 5),并等待3秒钟的超时时间
implicit val timeout = Timeout(3.seconds)
val result = calculator ? (10, "+", 5)

// 打印出计算结果或超时信息
result.map(println).recover 
  case e: Exception => println(e.getMessage)

运行上面的代码,我们可以看到控制台输出了以下内容:

15

这表示我们成功地向Calculator发送了一个同步消息,并得到了正确的回复。

转发收到的消息给另一个Actor,保持原始发送者不变

import akka.actor.Actor, ActorRef, ActorSystem, Props

object Example 

  // 定义一个Receiver类,继承Actor特质,用于接收和回复消息
  class Receiver extends Actor 
    // 重写receive方法,用于处理收到的消息
    def receive: Receive = 
      // 如果收到了一个字符串,就向原始发送者回复一个"Hi"的消息
      case message: String => sender() ! "Hi"
      // 如果收到了其他类型的消息,就忽略
      case _ =>
    
  

  // 定义一个Forwarder类,继承Actor特质,用于转发消息
  class Forwarder extends Actor 
    // 重写receive方法,用于处理收到的消息
    def receive: Receive = 
      // 如果收到了一个ActorRef和一个字符串,就向该ActorRef转发该字符串,并保持原始发送者不变
      case (target: ActorRef, message: String) => target ! message
      // 如果收到了一个字符串,就打印出来
      case message: String => println(s"Forwarder received: $message")
      // 如果收到了其他类型的消息,就忽略
      case _ =>
    
  

  def main(args: Array[String]): Unit = 
    // 创建一个ActorSystem实例,用于管理和监控Actor,并定义一个隐式值context
    val system = ActorSystem("example")
    //implicit val context: ActorSystem = system

    // 使用actorOf方法创建一个、一个Receiver实例和一个Forwarder实例,并返回它们的ActorRef实例
    val receiver = system.actorOf(Props[Receiver], "receiver")
    val forwarder = system.actorOf(Props(new Forwarder), "forwarder")

    // 向Forwarder发送一个元组消息,即(receiver, "Hello"),表示让Forwarder向Receiver转发一个"Hello"的消息
    forwarder ! (receiver, "Hello")
  

输出

Forwarder received: Hi

Actor定时任务

在Akka中,提供了一个scheduler对象来实现定时调度功能。使用ActorSystem.scheduler.schedule()方法,就可以启动一个定时任务。

schedule()方法的格式

  • 方式一: 采用发送消息的形式实现.

    def schedule(
        initialDelay: FiniteDuration,		// 延迟多久后启动定时任务
        interval: FiniteDuration,			// 每隔多久执行一次
        receiver: ActorRef,					// 给哪个Actor发送消息
        message: Any)						// 要发送的消息
    (implicit executor: ExecutionContext)	// 隐式参数:需要手动导入
    
  • 方式二: 采用自定义方式实现.

    def schedule(
        initialDelay: FiniteDuration,			// 延迟多久后启动定时任务
        interval: FiniteDuration				// 每隔多久执行一次
    )(f:Unit)								// 定期要执行的函数,可以将逻辑写在这里
    (implicit executor: ExecutionContext)		// 隐式参数:需要手动导入
    

注意: 不管使用上述的哪种方式实现定时器, 都需要导入隐式转换和隐式参数, 具体如下:

//导入隐式转换, 用来支持 定时器.
import actorSystem.dispatcher
//导入隐式参数, 用来给定时器设置默认参数.
import scala.concurrent.duration._

例子

需求

  1. 定义一个ReceiverActor, 用来循环接收消息, 并打印接收到的内容.
  2. 创建一个ActorSystem, 用来管理所有用户自定义的Actor.
  3. 关联ActorSystem和ReceiverActor.
  4. 导入隐式转换和隐式参数.
  5. 通过定时器, 定时(间隔1秒)给ReceiverActor发送一句话.
    • 方式一: 采用发送消息的形式实现.
    • 方式二: 采用自定义方式实现.

参考代码

import akka.actor.Actor, ActorSystem, Props

import scala.language.postfixOps

object MainActor 
  //定义一个Actor, 用来循环接收消息, 并打印.
  object ReceiverActor extends Actor 
    override def receive: Receive = 
      case x => println(x)      //不管接收到的是什么, 都打印.
    
  

  def main(args: Array[String]): Unit = 
    //创建一个ActorSystem, 用来管理所有用户自定义的Actor.
    val actorSystem = ActorSystem("actorSystem")
    //关联ActorSystem和ReceiverActor.
    val receiverActor = ac

Scala笔记整理:Actor和AKKA

[TOC]


概述

? Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程并发时出现资源争用的情况,进而提升多线程编程的性能。

Spark中使用的分布式多线程框架,是Akka,是Scala的一种多线程的类库。Akka也实现了类似Scala Actor的模型,其核心概念同样也是Actor。Scala Actor模型已经在2.1.0的时候还在用,但是在2.1.1的时候已经被遗弃了,Spark开始转换用AKKA来替代Scala Actor,但是Scala Actor概念和原理都还是相同的。所以学习Scala Actor对我们学习AKKA,Spark还是有所帮助的

之所以学习Scala Actor,AKKA是因为在学习Spark源码的时,我们能看懂Spark的源码,因为在底层消息传递机制上大量使用AKKA的传送机制。

scala actor

在使用前,需要先引入maven依赖:

<!--scala actor-->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-actors</artifactId>
    <version>2.10.5</version>
</dependency>

actor单向通信

测试代码如下:

package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
/**
  * 学习scala actor的基本操作
  * 和java中的Runnable Thread几乎一致
  *
  * 第一步:编写一个类,扩展特质trait Actor(scala 的actor)
  * 第二步:复写其中的act方法
  * 第三步:创建该actor的对象,调用该对象的start()方法,启动该线程
  * 第四步:通过scala的操作符"!",发送消息
  * 第五步:结束的话,调用close即可
  *
  * 模拟单向打招呼
  */
object ActorOps {
    def main(args: Array[String]): Unit = {
        val mFActor = new MyFirstActor()
        mFActor.start()
        // 发送消息
        mFActor ! "小美,睡了吗?"
        mFActor ! "我去洗澡了~"
        mFActor ! "呵呵"
    }
}

class MyFirstActor extends Actor {
    override def act(): Unit = {
        while(true) {
            receive {
                case str: String => println(str)
            }
        }
    }
}

输出结果如下:

小美,睡了吗?
我去洗澡了~
呵呵

使用样例类(case class)进行actor消息传递

测试代码如下:

package cn.xpleaf.bigdata.p5.myactor

import scala.actors.Actor

/**
  *
  */
object GreetingActor {
    def main(args: Array[String]): Unit = {
        val ga = new GreetingActor
        ga.start()

        ga ! Greeting("小美")
        ga ! WorkContent("装系统")
    }
}

case class Greeting(name:String)
case class WorkContent(content:String)

class GreetingActor extends Actor {
    override def act(): Unit = {
        while(true) {
            receive {
                case Greeting(name) => println(s"Hello, $name")
                case WorkContent(content) => println(s"Let‘s talk about sth. with $content")
            }
        }
    }
}

输出结果如下:

Hello, 小美
Let‘s talk about sth. with 装系统

actor相互通信

测试代码如下:

package cn.xpleaf.bigdata.p5.myactor

import scala.actors.Actor

/**
  * actor之线程间,互相通信
  *
  * studentActor
  *     向老师问了一个问题
  *
  * teacherActor
  *     向学生做回应
  *
  * 通信的协议:
  * 请求,使用Request(内容)来表示
  * 响应,使用Response(内容)来表示
  */
object _03CommunicationActorOps {
    def main(args: Array[String]): Unit = {
        val teacherActor = new TeacherActor()
        teacherActor.start()
        val studentActor = new StudentActor(teacherActor)
        studentActor.start()

        studentActor ! Request("老李啊,scala学习为什么这么难啊")
    }
}

case class Request(req:String)
case class Response(resp:String)

class StudentActor(teacherActor: TeacherActor) extends Actor {
    override def act(): Unit = {
        while(true) {
            receive {
                case Request(req) => {
                    // 向老师请求相关的问题
                    println("学生向老师说:" + req)
                    teacherActor ! Request(req)
                }
                case Response(resp) => {
                    println(resp)
                    println("高!")
                }
            }
        }
    }
}

class TeacherActor() extends Actor {
    override def act(): Unit = {
        while (true) {
            receive {
                case Request(req) => {  // 接收到学生的请求
                    sender ! Response("这个问题,需要如此搞定~")
                }
            }
        }
    }
}

输出结果如下:

学生向老师说:老李啊,scala学习为什么这么难啊
这个问题,需要如此搞定~
高!

消息的同步和Future

1、Scala在默认情况下,消息都是以异步进行发送的;但是如果发送的消息是同步的,即对方接受后,一定要给自己返回结果,那么可以使用!?的方式发送消息。即:

val response= activeActor !? activeMessage

2、如果要异步发送一个消息,但是在后续要获得消息的返回值,那么可以使用Future。即!!语法,如下:

val futureResponse = activeActor !! activeMessage
val activeReply = future()

AKKA actor

首先需要添加akka的maven依赖:

<!--akka actor-->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.10</artifactId>
    <version>2.3.16</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-remote_2.10</artifactId>
    <version>2.3.16</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-slf4j_2.10</artifactId>
    <version>2.3.16</version>
</dependency>

AKKA消息传递——本地

原理如下:

技术分享图片

_01StudentActorOps

package cn.xpleaf.bigdata.p5.myakka.p1

import akka.actor.{Actor, ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}

import scala.util.Random

/**
  * 基于AKKA Actor的单向通信案例
  * 学生向老师发送请求
  */
object _01StudentActorOps {
    def main(args: Array[String]): Unit = {
        // 第一步:构建Actor操作系统
        val actorSystem = ActorSystem("StudentActorSystem")
        // 第二步:actorSystem创建TeacherActor的代理对象ActorRef
        val teacherActorRef = actorSystem.actorOf(Props[TeacherActor])
        // 第三步:发送消息
        teacherActorRef ! QuoteRequest()

        Thread.sleep(2000)
        // 第四步:关闭
        actorSystem.shutdown()
    }
}

class TeacherActor extends Actor {
    val quotes = List(
        "Moderation is for cowards",
        "Anything worth doing is worth overdoing",
        "The trouble is you think you have time",
        "You never gonna know if you never even try")

    override def receive = {
        case QuoteRequest() => {
            val random = new Random()

            val randomIndex = random.nextInt(quotes.size)
            val randomQuote = quotes(randomIndex)

            val response = QuoteResponse(randomQuote)
            println(response)
        }
    }
}

MessageProtocol

后面akka通信的几个测试程序都会使用到这个object,只在这里给出,后面不再给出。

package cn.xpleaf.bigdata.p5.myakka

/**
  * akka actor通信协议
  */
object MessageProtocol {

    case class QuoteRequest()

    case class QuoteResponse(resp: String)

    case class InitSign()

}

object Start extends Serializable

object Stop extends Serializable

trait Message {
    val id: String
}

case class Shutdown(waitSecs: Int) extends Serializable

case class Heartbeat(id: String, magic: Int) extends Message with Serializable

case class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializable

case class Packet(id: String, seq: Long, content: String) extends Message with Serializable

测试

输出结果如下:

QuoteResponse(Anything worth doing is worth overdoing)

AKKA请求与响应——本地

原理如下:

技术分享图片

TeacherActor

package cn.xpleaf.bigdata.p5.myakka.p2

import akka.actor.Actor
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}

import scala.util.Random

/**
  * Teacher Actor
  */
class TeacherActor extends Actor {
    val quotes = List(
        "Moderation is for cowards",
        "Anything worth doing is worth overdoing",
        "The trouble is you think you have time",
        "You never gonna know if you never even try")

    override def receive = {
        case QuoteRequest() => {
            val random = new Random()

            val randomIndex = random.nextInt(quotes.size)
            val randomQuote = quotes(randomIndex)

            val response = QuoteResponse(randomQuote)
//            println(response)
            sender ! response
        }
    }
}

StudentActor

package cn.xpleaf.bigdata.p5.myakka.p2

import akka.actor.{Actor, ActorLogging, ActorRef}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{InitSign, QuoteRequest, QuoteResponse}

/**
  * Student Actor
  * 当学生接收到InitSign信号之后,便向老师发送一条Request请求的消息
  */
class StudentActor(teacherActorRef:ActorRef) extends Actor with ActorLogging {
    override def receive = {
        case InitSign => {
            teacherActorRef ! QuoteRequest()
//            println("student send request")
        }
        case QuoteResponse(resp) => {
            log.info(s"$resp")
        }
    }
}

DriverApp

package cn.xpleaf.bigdata.p5.myakka.p2

import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSign

object DriverApp {
    def main(args: Array[String]): Unit = {
        val actorSystem = ActorSystem("teacherStudentSystem")
        // 老师的代理对象
        val teacherActorRef = actorSystem.actorOf(Props[TeacherActor], "teacherActor")
        // 学生的代理对象
        val studentActorRef = actorSystem.actorOf(Props[StudentActor](new StudentActor(teacherActorRef)), "studentActor")

        studentActorRef ! InitSign

        Thread.sleep(2000)

        actorSystem.shutdown()
    }
}

测试

输出结果如下:

[INFO] [04/24/2018 10:02:19.932] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoing

AKKA请求与响应——远程

application.conf

MyRemoteServerSideActor {
  akka {
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
    }
  }
}
MyRemoteClientSideActor {
  akka {
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
  }
}

RemoteActor

package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Shutdown, Start, Stop}

class RemoteActor extends Actor with ActorLogging {
    def receive = {
        case Start => { // 处理Start消息
            log.info("Remote Server Start ==>RECV Start event : " + Start)
        }
        case Stop => { // 处理Stop消息
            log.info("Remote Server Stop ==>RECV Stop event: " + Stop)
        }
        case Shutdown(waitSecs) => { // 处理Shutdown消息
            log.info("Remote Server Shutdown ==>Wait to shutdown: waitSecs=" + waitSecs)
            Thread.sleep(waitSecs)
            log.info("Remote Server Shutdown ==>Shutdown this system.")
            context.system.shutdown // 停止当前ActorSystem系统
        }
        case Header(id, len, encrypted) => log.info("Remote Server => RECV header: " + (id, len, encrypted)) // 处理Header消息
        case _ =>
    }
}

AkkaServerApplication

package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object AkkaServerApplication extends App {
    // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容
    val system = ActorSystem("remote-system",
        ConfigFactory.load().getConfig("MyRemoteServerSideActor"))

    val log = system.log
    log.info("===>Remote server actor started: " + system)
    // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值
    system.actorOf(Props[RemoteActor], "remoteActor")

}

ClientActor

package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}

class ClientActor extends Actor with ActorLogging {

    // akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
    val path = "akka.tcp://[email protected]:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用
    val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息
    @volatile var connected = false
    @volatile var stop = false

    def receive = {
        case Start => { // 发送Start消息表示要与远程Actor进行后续业务逻辑处理的通信,可以指示远程Actor初始化一些满足业务处理的操作或数据
            send(Start)
            if (!connected) {
                connected = true
                log.info("ClientActor==> Actor connected: " + this)
            }
        }
        case Stop => {
            send(Stop)
            stop = true
            connected = false
            log.info("ClientActor=> Stopped")
        }
        case header: Header => {
            log.info("ClientActor=> Header")
            send(header)
        }
        case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收远程Actor处理一个Packet消息的结果
        case m => log.info("Unknown message: " + m)
    }

    private def send(cmd: Serializable): Unit = {
        log.info("Send command to server: " + cmd)
        try {
            remoteServerRef ! cmd // 发送一个消息到远程Actor,消息必须是可序列化的,因为消息对象要经过网络传输
        } catch {
            case e: Exception => {
                connected = false
                log.info("Try to connect by sending Start command...")
                send(Start)
            }
        }
    }

}

AkkaClientApplication

package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}
import com.typesafe.config.ConfigFactory

object AkkaClientApplication extends App {
    // 通过配置文件application.conf配置创建ActorSystem系统
    val system = ActorSystem("client-system",
        ConfigFactory.load().getConfig("MyRemoteClientSideActor"))
    val log = system.log
    val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用
    clientActor ! Start // 发送一个Start消息,第一次与远程Actor握手(通过本地ClientActor进行转发)
    Thread.sleep(2000)
    clientActor ! Header("What‘s your name: Can you tell me ", 20, encrypted = false) // 发送一个Header消息到远程Actor(通过本地ClientActor进行转发)
    Thread.sleep(2000)

}

测试

服务端输出结果如下:

[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [04/24/2018 09:39:49.509] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
[INFO] [04/24/2018 09:39:49.517] [main] [ActorSystem(remote-system)] ===>Remote server actor started: akka://remote-system
[INFO] [04/24/2018 09:46:01.872] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2552/user/remoteActor] Remote Server Start ==>RECV Start event : [email protected]
[INFO] [04/24/2018 09:46:03.501] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2552/user/remoteActor] Remote Server => RECV header: (What‘s your name: Can you tell me ,20,false)

客户端输出结果如下:

[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
[INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/user/clientActor] Send command to server: [email protected]
[INFO] [04/24/2018 09:46:01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2552/user/clientActor] ClientActor==> Actor connected: [email protected]
[INFO] [04/24/2018 09:46:03.490] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://[email protected]:2552/user/clientActor] ClientActor=> Header
[INFO] [04/24/2018 09:46:03.491] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://[email protected]:2552/user/clientActor] Send command to server: Header(What‘s your name: Can you tell me ,20,false)

以上是关于Scala框架Akka学习的主要内容,如果未能解决你的问题,请参考以下文章

Scala-Unit7-Scala并发编程模型AKKA

一 Akka学习 - actor

scala学习笔记-Actor(19)

Scala笔记整理:Actor和AKKA

Scala笔记整理:Actor和AKKA

使用akka框架和scala语言编写简单的RPC通信案例