akka初识Akka 简单介绍

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了akka初识Akka 简单介绍相关的知识,希望对你有一定的参考价值。

1.概述

转载:初识Akka 简单介绍

1.1 Flink为什么要用Akka来代替RPC?

原先的RPC服务存在的问题:

  • 没有带回调的异步调用功能,这也是为什么Flink的多个运行时组件需要poll状态的原因,这导致了不必要的延时。
  • 没有exception forwarding,产生的异常都只能简单地吞噬掉,这使得在运行时产生一些非常难调试的古怪问题
  • 处理器的线程数受到限制,RPC只能处理一定量的并发请求,这迫使你不得不隔离线程池
  • 参数不支持原始数据类型(或者原始数据类型的装箱类型),所有的一切都必须有一个特殊的序列化类
  • 棘手的线程模型,RPC会持续的产生或终止线程

1.2 采用Akka的actor模型带来的好处:

  • Akka解决上述的所有问题,并对外透明
  • supervisor模型允许你对actor做失效检测,它提供一个统一的方式来检测与处理失败(比如心跳丢失、调用失败…)
  • Akka有工具来持久化有状态的actor,一旦失败可以在其他机器上重启他们。这个机制在master fail-over的场景下将会变得非常有用并且很重要。
  • 你可以定义许多call target(actor),在TaskManager上的任务可以直接在JobManager上调用它们的ExecutionVertex,而不是调用JobManager,让其产生一个线程来查看执行状态。
  • actor模型接近于在actor上采用队列模型一个接一个的运行,这使得状态机的并发模型变得简单而又健壮

1.3 Akka简介

Akka是一个用来开发支持并发、容错、扩展性的应用程序框架,基于actor模式的实现。

在actor模型的上下文中,所有的活动实体都被认为是互不依赖的actor,actor之间的互相通信是通过彼此之间发送异步消息来实现的。每个actor都有一个邮箱(Mailbox)来存储接收到的消息,因此每个actor都维护着自己独立的状态。

1.4 Akka几大特性

  • 易于构建并行和分布式应用 (Simple Concurrency & Distribution) Akka在设计时采用了异步通讯和分布式架构,并对上层进行抽象,如Actors、Futures ,STM等。

  • 容错性(Resilient by Design) 系统具备自愈能力,在本地/远程都有监护。

  • 高性能(High Performance) 在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。

  • 弹性,无中心(Elastic — Decentralized) 自适应的负责均衡,路由,分区,配置

  • 伸缩性(Extensible) 可以使用Akka 扩展包进行扩展。

Akka的核心(Akka-actor)是非常小的,可以非常方便地放进你的应用中,提供你需要的异步无锁并行功能。可以以两种不同的方式来使用:

  • 以库的形式:在web应用中使用,放到 WEB-INF/lib 中或者作为一个普通的Jar包放进classpath。
  • 以微内核的形式:你可以将应用放进一个独立的内核。自己有一个main类来初始化Actor系统。

2. Akka简单使用

使用IDEA+Maven构建Akka开发环境。导入Maven依赖


    <properties>
        <slf4j-api.version>1.7.28</slf4j-api.version>
        <fastjson.version>1.2.76</fastjson.version>
        <lombok.version>1.18.16</lombok.version>
    </properties>


    <dependencies>
        <!-- 日志 相关 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j-api.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j-api.version</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>$fastjson.version</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>$lombok.version</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.5</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster-tools_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-slf4j_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream-testkit_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster-tools_2.11</artifactId>
            <version>2.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http-core_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http-experimental_2.11</artifactId>
            <version>2.4.10</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

Java实现Actor Demo

public class JavaPongActor extends AbstractActor 

    /**
     * AbstractActor 类有一个 receive 方法,其子类必须实现这个方法或是通过构造函数调用该方法。
     *
     * @return 返回的类型是PartialFunction(偏函数), 这个类型来自Scala API。在Java中
     * Akka为我们提供了一个抽象的构造方法类ReceiveBuilder,用于生成 PartialFunction 作为返回值。
     */
    @Override
    public PartialFunction receive() 
        return ReceiveBuilder
                // matchEquals和matchAny用来匹配消息
                .matchEquals("Ping", s ->
                        sender().tell("Pong", ActorRef.noSender()))
                .match(String.class, s ->
                        System.out.println("It's a string: " + s))
                .matchAny(x ->
                        sender().tell(
                                new Status.Failure(new Exception("unknown message")), self()))
                .build();
    

上述Java代码展示了如何使用Akka中对应的Java API,每一个具体的API含义如下:

方法含义
ReceiveAbstractActor 类有一个 receive 方法,其子类必须实现这个方法或是通过构造函数调用该方法。 receive 方法返回的类型是 PartialFunction, 这个类型来自 Scala API。在 Java 中,并没有提供任何原生方法来构造 Scala 的 PartialFunction(偏函数) ,因此 Akka 为我们提供了一个抽象的构造方法类
ReceiveBuilder主要是用Build方法返回PartialFunction
Match类似于Scala中的模式匹配,用于消息的匹配:
match(class, function):描述了对于任何尚未匹配的该类型的示例,应该如何响应。
match(String.class, s -> if(s.equals("Ping")) respondToPing(s);)
match(class, predicate, function):描述了对于 predicate 条件函数为真的某特定类型的消息,应该如何响应。
match(String.class, s -> s.equals("Ping"), s -> respondToPing(s))
matchEquals(object, function):描述了对于和传入的第一个参数相等的消息,应该如何响应。
matchEquals("Ping", s -> respondToPing(s))
matchAny(function):该函数匹配所有尚未匹配的消息。通常来说,最佳实践是返回错误信息,或者至少将错误信息记录到日志,帮助开发过程中的错误调试。
Sender返回所收到的消息的响应,响应的对象既可能是一个 Actor,也可能是来自于 Actor 系统外部的请求。
Tellsender()函数会返回一个 ActorRef。 在上面的例子中, 我们调用了 sender().tell()。tell()是最基本的单向消息传输模式。 第一个参数是我们想要发送至对方信箱的消息。第二个参数则是希望对方 Actor 看到的发送者。
Ask向 Actor 发送一条消息,返回一个 Future。当 Actor 返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
Forward将接收到的消息再发送给另一个 Actor。所有发送至 sender()的响应都会返回给原始消息的发送者。
Pipe用于将 Future 的结果返回给 sender()或另一个 Actor。如果正在使用 Ask 或是处理一个 Future,那么使用 Pipe 可以正确地返回 Future 的结果。

Scala实现Actor Demo

class ScalaPongActor extends Actor 
  override def receive: Receive = 
    case "Ping" => sender() ! "Pong"
    case _ => sender() ! Status.Failure(new Exception("unknown message"))
  

以上代码使用Scala语言实现了一个简单的Actor,其API大部分含义和Java中雷同,有一些在使用上不同如下:

方法含义
Receive在 Actor 中重写基类的 receive 方法。并且返回一个 PartialFunction。要注意的是,receive 方法的返回类型是 Receive。Receive 只不过是定义的一种类型, 表示 scala.PartialFunction[scala.Any, scala.Unit]。
Sender返回所收到的消息的响应,响应的对象既可能是一个 Actor,也可能是来自于 Actor 系统外部的请求。
!在 Scala 中,通过“! ”来调用 tell 方法。在 Scala 中,消息发送者是隐式传入的,因此我们不需要再显式传入消息发送者的引用。在 tell 方法“!”的方法签名中,有一个隐式的 ActorRef 参数。如果在 Actor 外部调用 tell 方法的话,该参数的默认值会设为 noSender。下面就是该方法的签名:
def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
?“?”在scala代表的是ask。
Failure在收到未知消息时,返回akka.actor.Status.Failure。Actor 本身在任何情况下都不会自己返回 Failure(即使Actor 本身出现错误) 。 因此如果想要将发生的错误通知消息发送者, 那么我们必须要主动发送一个 Failure 给对方。 发送回 Failure 会导致请求方的 Future 被标记为失败。

另外需要注意的是,在Scala中Actor 中有一个隐式的变量 self, Actor 通过 self 得到消息发送者的值。 因此 Actor 中 tell 方法的消息发送者永远是 self。

   implicit final val self = context.self

以上是关于akka初识Akka 简单介绍的主要内容,如果未能解决你的问题,请参考以下文章

使用Akka构建集群

使用Akka构建集群

Akka(41): Http:DBTable-rows streaming - 数据库表行交换

akka设计模式系列-Chain模式

Scala框架Akka学习

AKKA HTTP 简单示例