akka初识Akka 简单介绍
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了akka初识Akka 简单介绍相关的知识,希望对你有一定的参考价值。
1.概述
转载:初识Akka 简单介绍
1.1 Flink为什么要用Akka来代替RPC?
原先的RPC服务存在的问题:
- 没有带回调的异步调用功能,这也是为什么Flink的多个运行时组件需要poll状态的原因,这导致了不必要的延时。
- 没有exception forwarding,产生的异常都只能简单地吞噬掉,这使得在运行时产生一些非常难调试的古怪问题
- 处理器的线程数受到限制,RPC只能处理一定量的并发请求,这迫使你不得不隔离线程池
- 参数不支持原始数据类型(或者原始数据类型的装箱类型),所有的一切都必须有一个特殊的序列化类
- 棘手的线程模型,RPC会持续的产生或终止线程
1.2 采用Akka的actor模型带来的好处:
- Akka解决上述的所有问题,并对外透明
- supervisor模型允许你对actor做失效检测,它提供一个统一的方式来检测与处理失败(比如心跳丢失、调用失败…)
- Akka有工具来持久化有状态的actor,一旦失败可以在其他机器上重启他们。这个机制在master fail-over的场景下将会变得非常有用并且很重要。
- 你可以定义许多call target(actor),在TaskManager上的任务可以直接在JobManager上调用它们的ExecutionVertex,而不是调用JobManager,让其产生一个线程来查看执行状态。
- actor模型接近于在actor上采用队列模型一个接一个的运行,这使得状态机的并发模型变得简单而又健壮
1.3 Akka简介
Akka是一个用来开发支持并发、容错、扩展性的应用程序框架,基于actor模式的实现。
在actor模型的上下文中,所有的活动实体都被认为是互不依赖的actor,actor之间的互相通信是通过彼此之间发送异步消息来实现的。每个actor都有一个邮箱(Mailbox)来存储接收到的消息,因此每个actor都维护着自己独立的状态。
1.4 Akka几大特性
-
易于构建并行和分布式应用 (Simple Concurrency & Distribution) Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。
-
容错性(Resilient by Design) 系统具备自愈能力,在本地/远程都有监护。
-
高性能(High Performance) 在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。
-
弹性,无中心(Elastic — Decentralized) 自适应的负责均衡,路由,分区,配置
-
伸缩性(Extensible) 可以使用Akka 扩展包进行扩展。
Akka的核心(Akka-actor)是非常小的,可以非常方便地放进你的应用中,提供你需要的异步无锁并行功能。可以以两种不同的方式来使用:
- 以库的形式:在web应用中使用,放到 WEB-INF/lib 中或者作为一个普通的Jar包放进classpath。
- 以微内核的形式:你可以将应用放进一个独立的内核。自己有一个main类来初始化Actor系统。
2. Akka简单使用
使用IDEA+Maven构建Akka开发环境。导入Maven依赖
<properties>
<slf4j-api.version>1.7.28</slf4j-api.version>
<fastjson.version>1.2.76</fastjson.version>
<lombok.version>1.18.16</lombok.version>
</properties>
<dependencies>
<!-- 日志 相关 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j-api.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j-api.version</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>$fastjson.version</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>$lombok.version</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.5</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-testkit_2.11</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-experimental_2.11</artifactId>
<version>2.4.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
Java实现Actor Demo
public class JavaPongActor extends AbstractActor
/**
* AbstractActor 类有一个 receive 方法,其子类必须实现这个方法或是通过构造函数调用该方法。
*
* @return 返回的类型是PartialFunction(偏函数), 这个类型来自Scala API。在Java中
* Akka为我们提供了一个抽象的构造方法类ReceiveBuilder,用于生成 PartialFunction 作为返回值。
*/
@Override
public PartialFunction receive()
return ReceiveBuilder
// matchEquals和matchAny用来匹配消息
.matchEquals("Ping", s ->
sender().tell("Pong", ActorRef.noSender()))
.match(String.class, s ->
System.out.println("It's a string: " + s))
.matchAny(x ->
sender().tell(
new Status.Failure(new Exception("unknown message")), self()))
.build();
上述Java代码展示了如何使用Akka中对应的Java API,每一个具体的API含义如下:
方法 | 含义 |
---|---|
Receive | AbstractActor 类有一个 receive 方法,其子类必须实现这个方法或是通过构造函数调用该方法。 receive 方法返回的类型是 PartialFunction, 这个类型来自 Scala API。在 Java 中,并没有提供任何原生方法来构造 Scala 的 PartialFunction(偏函数) ,因此 Akka 为我们提供了一个抽象的构造方法类 |
ReceiveBuilder | 主要是用Build方法返回PartialFunction |
Match | 类似于Scala中的模式匹配,用于消息的匹配: match(class, function):描述了对于任何尚未匹配的该类型的示例,应该如何响应。 match(String.class, s -> if(s.equals("Ping")) respondToPing(s);) match(class, predicate, function):描述了对于 predicate 条件函数为真的某特定类型的消息,应该如何响应。 match(String.class, s -> s.equals("Ping"), s -> respondToPing(s)) matchEquals(object, function):描述了对于和传入的第一个参数相等的消息,应该如何响应。 matchEquals("Ping", s -> respondToPing(s)) matchAny(function):该函数匹配所有尚未匹配的消息。通常来说,最佳实践是返回错误信息,或者至少将错误信息记录到日志,帮助开发过程中的错误调试。 |
Sender | 返回所收到的消息的响应,响应的对象既可能是一个 Actor,也可能是来自于 Actor 系统外部的请求。 |
Tell | sender()函数会返回一个 ActorRef。 在上面的例子中, 我们调用了 sender().tell()。tell()是最基本的单向消息传输模式。 第一个参数是我们想要发送至对方信箱的消息。第二个参数则是希望对方 Actor 看到的发送者。 |
Ask | 向 Actor 发送一条消息,返回一个 Future。当 Actor 返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。 |
Forward | 将接收到的消息再发送给另一个 Actor。所有发送至 sender()的响应都会返回给原始消息的发送者。 |
Pipe | 用于将 Future 的结果返回给 sender()或另一个 Actor。如果正在使用 Ask 或是处理一个 Future,那么使用 Pipe 可以正确地返回 Future 的结果。 |
Scala实现Actor Demo
class ScalaPongActor extends Actor
override def receive: Receive =
case "Ping" => sender() ! "Pong"
case _ => sender() ! Status.Failure(new Exception("unknown message"))
以上代码使用Scala语言实现了一个简单的Actor,其API大部分含义和Java中雷同,有一些在使用上不同如下:
方法 | 含义 |
---|---|
Receive | 在 Actor 中重写基类的 receive 方法。并且返回一个 PartialFunction。要注意的是,receive 方法的返回类型是 Receive。Receive 只不过是定义的一种类型, 表示 scala.PartialFunction[scala.Any, scala.Unit]。 |
Sender | 返回所收到的消息的响应,响应的对象既可能是一个 Actor,也可能是来自于 Actor 系统外部的请求。 |
! | 在 Scala 中,通过“! ”来调用 tell 方法。在 Scala 中,消息发送者是隐式传入的,因此我们不需要再显式传入消息发送者的引用。在 tell 方法“!”的方法签名中,有一个隐式的 ActorRef 参数。如果在 Actor 外部调用 tell 方法的话,该参数的默认值会设为 noSender。下面就是该方法的签名:def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit |
? | “?”在scala代表的是ask。 |
Failure | 在收到未知消息时,返回akka.actor.Status.Failure。Actor 本身在任何情况下都不会自己返回 Failure(即使Actor 本身出现错误) 。 因此如果想要将发生的错误通知消息发送者, 那么我们必须要主动发送一个 Failure 给对方。 发送回 Failure 会导致请求方的 Future 被标记为失败。 |
另外需要注意的是,在Scala中Actor 中有一个隐式的变量 self, Actor 通过 self 得到消息发送者的值。 因此 Actor 中 tell 方法的消息发送者永远是 self。
implicit final val self = context.self
以上是关于akka初识Akka 简单介绍的主要内容,如果未能解决你的问题,请参考以下文章