Akka(10): 分布式运算:集群-Cluster

Posted 雪川大虫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Akka(10): 分布式运算:集群-Cluster相关的知识,希望对你有一定的参考价值。

   Akka-Cluster可以在一部物理机或一组网络连接的服务器上搭建部署。用Akka开发同一版本的分布式程序可以在任何硬件环境中运行,这样我们就可以确定以Akka分布式程序作为标准的编程方式了。

   在上面两篇讨论里我们介绍了Akka-Remoting。Akka-Remoting其实是一种ActorSystem之间Actor对Actor点对点的沟通协议。通过Akka-Remoting来实现一个ActorSystem中的一个Actor与另一个Actorsystem中的另一个Actor之间的沟通。在Remoting功能之后,Akka又发展了集群Cluster功能。Akka-Cluster是基于Akka-Remoting之上的新一代分布式运算环境,所以Remoting已经成为了Akka-Cluster的内部支持功能,在生产环境中的分布式运算应该尽量使用Akka-Cluster。当然,人们仍然可以在学习和测试环境中使用Akka-Remoting来了解Akka的分布式运算机制和原理。Remoting和Cluster的明显分别之一就是真正实现了Actor的位置透明化。让编程人员可以更轻松自然的实现分布式编程。当然,更重要的是相对Akka-Remoting而言,Akka-Cluster提供了一整套更安全、更高效的分布式运算环境。

简单来说Akka-Cluster将多个JVM连接整合起来,实现消息地址的透明化和统一化使用管理,集成一体化的消息驱动系统。最终目的是能够把一个大型程序分割成多个子程序,然后部署到很多JVM上去实现程序的分布式并行运算。更重要的是:Cluster的构建过程与Actor编程没有牵连,当Cluster把多个ActorSystem集合成一个统一系统后,我们可以用在单一ActorSystem里编程的习惯方式编写分布式运算程序。由于在单一机器上就可以配置多个节点形成一个集群,我们开发的分布式程序可以在单机或多机群上运行,不同的只是如何部署和配置集群环境。

我们首先来认识一些有关Akka-Cluster的基础概念:

Node:集群节点,也可以说是代表一个独立的ActorSystem,用hostname:port来表示。一部物理机器上可以构建多个集群节点Node,这时它们有着相同的hostname和不同的port,在不同机器上的Node则可以使用不同的hostname和相同的port。

Cluster:由多个节点Node作为集群成员通过一种集群组织协议形成集群的一个整体。

Leader:集群中的某个成员节点Node。由Akka自动在集群成员中选定,负责集群成员生命周期状态的具体转换操作。

Seed-Node:由一或多个集群中的节点组成。一个节点在加入集群之前先向所有用户指定的Seed-Node发出联系消息,然后向最先答复的Seed-Node发出加入集群请求。Seed-Node的主要作用是为申请加入集群的节点提供具体的联络地址,毕竟申请加入的节点需要一个具体的地址来发送申请加入消息,从这个方面来说:Seed-Node可以是集群中任何已知地址的节点。

Node-Lifecycle-State:一个节点的生命周期里包括以下几个状态转换:

Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed

另外,Akka-Cluster通过交流心跳信号(heart-beat signal)方式可以监测任何节点是否处于无法联络Unreachable状态。

Membership:集群成员组织是通过Gossip沟通协议把多个节点组织起来形成的一个集群整体。

Membership-State: 集群状态,是一个集群内所有节点共享的数据结构,用于存放群内所有节点状态。集群状态是一种CRDT数据结构,提供安全便捷的数据合并操作,方便逐步累加型数据合并更新。

Gossip-Protocal:是Node之间的交流协议。集群内的节点分邻里相互通过Gossip交流更新集群状态数据,逐步扩散交流覆盖整个集群所有节点并形成完整的统一集群状态数据。

Gossip-Convergence:集群统一状态。当Gossip交流覆盖了集群中所有节点,即所有节点都获得统一的集群状态,就达到集群统一状态Convergence。

Failure-Detector fd:所有节点都具备心跳信号交流功能。集群中某个节点可能被多个节点用heartbeat检测在线是否Reachable/Unreachable。如果集群中任何一个节点处于Unreachable状态则整个集群无法达至Convergence状态。

Leader-Actions:当集群达到Convergence后系统自动选定一个Leader节点进行以上描述的节点状态转换操作。如果集群内有节点处于Unreachable状态,无法达到集群Convergence,则无法满足任何节点状态转换请求。

在Akka-Cluster中一个节点加入集群是自动的,只要在配置文件里设置一个Seed-Node清单,否则就必须在Actor程序里用Cluster.join或Cluster.joinSeedNodes方法加人:

 /**
   * Try to join this cluster node with the node specified by ‘address‘.
   * A ‘Join(selfAddress)‘ command is sent to the node to join.
   *
   * An actor system can only join a cluster once. Additional attempts will be ignored.
   * When it has successfully joined it must be restarted to be able to join another
   * cluster or to join the same cluster again.
   *
   * The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
   * cluster.
   */
  def join(address: Address): Unit =
    clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))

  /**
   * Join the specified seed nodes without defining them in config.
   * Especially useful from tests when Addresses are unknown before startup time.
   *
   * An actor system can only join a cluster once. Additional attempts will be ignored.
   * When it has successfully joined it must be restarted to be able to join another
   * cluster or to join the same cluster again.
   */
  def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
    clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))

集群节点Leave和Down实现方法如下: 

/**
    * Send command to issue state transition to LEAVING for the node specified by ‘address‘.
   * The member will go through the status changes [[MemberStatus]] `Leaving` (not published to
   * subscribers) followed by [[MemberStatus]] `Exiting` and finally [[MemberStatus]] `Removed`.
   *
   * Note that this command can be issued to any member in the cluster, not necessarily the
   * one that is leaving. The cluster extension, but not the actor system or JVM, of the
   * leaving member will be shutdown after the leader has changed status of the member to
   * Exiting. Thereafter the member will be removed from the cluster. Normally this is
   * handled automatically, but in case of network failures during this process it might
   * still be necessary to set the node’s status to Down in order to complete the removal.
   */
  def leave(address: Address): Unit =
    clusterCore ! ClusterUserAction.Leave(fillLocal(address))

  /**
   * Send command to DOWN the node specified by ‘address‘.
   *
   * When a member is considered by the failure detector to be unreachable the leader is not
   * allowed to perform its duties, such as changing status of new joining members to ‘Up‘.
   * The status of the unreachable member must be changed to ‘Down‘, which can be done with
   * this method.
   */
  def down(address: Address): Unit =
    clusterCore ! ClusterUserAction.Down(fillLocal(address))

Akka-Cluster的集群节点状态转换可以作为事件在Akka的EventBus上发布:

  /**
   * Marker interface for membership events.
   * Published when the state change is first seen on a node.
   * The state change was performed by the leader when there was
   * convergence on the leader node, i.e. all members had seen previous
   * state.
   */
  sealed trait MemberEvent extends ClusterDomainEvent {
    def member: Member
  }

  /**
   * Member status changed to Joining.
   */
  final case class MemberJoined(member: Member) extends MemberEvent {
    if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member)
  }

  /**
   * Member status changed to WeaklyUp.
   * A joining member can be moved to `WeaklyUp` if convergence
   * cannot be reached, i.e. there are unreachable nodes.
   * It will be moved to `Up` when convergence is reached.
   */
  final case class MemberWeaklyUp(member: Member) extends MemberEvent {
    if (member.status != WeaklyUp) throw new IllegalArgumentException("Expected WeaklyUp status, got: " + member)
  }

  /**
   * Member status changed to Up.
   */
  final case class MemberUp(member: Member) extends MemberEvent {
    if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
  }

  /**
   * Member status changed to Leaving.
   */
  final case class MemberLeft(member: Member) extends MemberEvent {
    if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
  }

  /**
   * Member status changed to `MemberStatus.Exiting` and will be removed
   * when all members have seen the `Exiting` status.
   */
  final case class MemberExited(member: Member) extends MemberEvent {
    if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
  }

  /**
   * Member completely removed from the cluster.
   * When `previousStatus` is `MemberStatus.Down` the node was removed
   * after being detected as unreachable and downed.
   * When `previousStatus` is `MemberStatus.Exiting` the node was removed
   * after graceful leaving and exiting.
   */
  final case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent {
    if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
  }
  /**
   * Marker interface to facilitate subscription of
   * both [[UnreachableMember]] and [[ReachableMember]].
   */
  sealed trait ReachabilityEvent extends ClusterDomainEvent {
    def member: Member
  }

  /**
   * A member is considered as unreachable by the failure detector.
   */
  final case class UnreachableMember(member: Member) extends ReachabilityEvent

  /**
   * A member is considered as reachable by the failure detector
   * after having been unreachable.
   * @see [[UnreachableMember]]
   */
  final case class ReachableMember(member: Member) extends ReachabilityEvent

集群的当前状态值是存放在下面CurrentClusterState结构里的: 

/**
   * Current snapshot state of the cluster. Sent to new subscriber.
   */
  final case class CurrentClusterState(
    members:       immutable.SortedSet[Member]  = immutable.SortedSet.empty,
    unreachable:   Set[Member]                  = Set.empty,
    seenBy:        Set[Address]                 = Set.empty,
    leader:        Option[Address]              = None,
    roleLeaderMap: Map[String, Option[Address]] = Map.empty) {

    /**
     * Java API: get current member list.
     */
    def getMembers: java.lang.Iterable[Member] = {
      import scala.collection.JavaConverters._
      members.asJava
    }

    /**
     * Java API: get current unreachable set.
     */
    def getUnreachable: java.util.Set[Member] =
      scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava

    /**
     * Java API: get current “seen-by” set.
     */
    def getSeenBy: java.util.Set[Address] =
      scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava

    /**
     * Java API: get address of current leader, or null if none
     */
    def getLeader: Address = leader orNull

    /**
     * All node roles in the cluster
     */
    def allRoles: Set[String] = roleLeaderMap.keySet

    /**
     * Java API: All node roles in the cluster
     */
    def getAllRoles: java.util.Set[String] =
      scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava

    /**
     * get address of current leader, if any, within the role set
     */
    def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)

    /**
     * Java API: get address of current leader within the role set,
     * or null if no node with that role
     */
    def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
  }

用户可以监听这些事件的发生:

cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
  classOf[MemberEvent], classOf[UnreachableMember])

另外,我们还可以用callback方式在状态转换前后调用一些运算来进行准备处理和事后处理:

 /**
   * The supplied thunk will be run, once, when current cluster member is `Up`.
   * Typically used together with configuration option `akka.cluster.min-nr-of-members`
   * to defer some action, such as starting actors, until the cluster has reached
   * a certain size.
   */
  def registerOnMemberUp[T](code: ? T): Unit =
    registerOnMemberUp(new Runnable { def run() = code })

  /**
   * Java API: The supplied callback will be run, once, when current cluster member is `Up`.
   * Typically used together with configuration option `akka.cluster.min-nr-of-members`
   * to defer some action, such as starting actors, until the cluster has reached
   * a certain size.
   */
  def registerOnMemberUp(callback: Runnable): Unit =
    clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)

  /**
   * The supplied thunk will be run, once, when current cluster member is `Removed`.
   * If the cluster has already been shutdown the thunk will run on the caller thread immediately.
   * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
   */
  def registerOnMemberRemoved[T](code: ? T): Unit =
    registerOnMemberRemoved(new Runnable { override def run(): Unit = code })

  /**
   * Java API: The supplied thunk will be run, once, when current cluster member is `Removed`.
   * If the cluster has already been shutdown the thunk will run on the caller thread immediately.
   * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
   */
  def registerOnMemberRemoved(callback: Runnable): Unit = {
    if (_isTerminated.get())
      callback.run()
    else
      clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback)
  }

下面我们就用个例子来示范Akka-Cluster的运作过程:

首先需要Akka-Cluster的dependency:build.sbt

name := "cluster-states-demo"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= {
  val akkaVersion = "2.5.3"
  Seq(
    "com.typesafe.akka"       %%  "akka-actor"   % akkaVersion,
    "com.typesafe.akka"       %%  "akka-cluster"   % akkaVersion
  )
}

然后是基本的配置:cluster.conf

akka {
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
  }
}

下面是一个集群状态转换事件的监听Actor:

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._

class EventLisener extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
    ,classOf[MemberEvent],classOf[UnreachableMember])  //订阅集群状态转换信息
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    //取消订阅
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info("Member is Joining: {}", member.address)
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case MemberLeft(member) =>
      log.info("Member is Leaving: {}", member.address)
    case MemberExited(member) =>
      log.info("Member is Exiting: {}", member.address)
    case MemberRemoved(member, previousStatus) =>
      log.info(
        "Member is Removed: {} after {}",
        member.address, previousStatus)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
    case _: MemberEvent => // ignore
  }

}

下面是EventListener使用测试代码:

object ClusterEventsDemo {
  def main(args: Array[String]): Unit = {
    //重设port,seed-node-address
    val port =
      if (args.isEmpty) "0"
      else args(0)

    val addr =
      if (args.length < 2) "2551"
      else args(1)


    val seednodeSetting = "akka.cluster.seed-nodes = ["+
    "\"akka.tcp://[email protected]:"+
      s"${addr}"+"\"]"



    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}")
      .withFallback(ConfigFactory.parseString(seednodeSetting))
      .withFallback(ConfigFactory.load("cluster.conf"))

    val clusterSystem = ActorSystem(name="clusterSystem",config=config)
    val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener")

    println("actor system started!")
    scala.io.StdIn.readLine()

    val cluster = Cluster(clusterSystem)
    cluster.leave(cluster.selfAddress)

    scala.io.StdIn.readLine()


    clusterSystem.terminate()


  }
}

我们在多个terminal上用sbt来测试运行:

1、run "2551" "2551"   //这是个seed-node

[INFO] [06/26/2017 21:25:46.743] [clusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Node [akka.tcp://[email protected]:2551] is JOINING, roles []
[INFO] [06/26/2017 21:25:46.751] [clusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Leader is moving node [akka.tcp://[email protected]:2551] to [Up]
[INFO] [06/26/2017 21:25:46.755] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:2551/user/eventListener] Member is Up: akka.tcp://[email protected]:2551

2、run "0" "2551"      //port=0代表由系统自动选择端口

[INFO] [06/26/2017 21:26:57.467] [run-main-1e] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:52459] - Started up successfully
actor system started!
[INFO] [06/26/2017 21:26:57.735] [clusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:52459] - Welcome from [akka.tcp://[email protected]:2551]
[INFO] [06/26/2017 21:26:57.751] [clusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:52459/user/eventListener] Member is Up: akka.tcp://[email protected]:2551
[INFO] [06/26/2017 21:26:57.752] [clusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:52459/user/eventListener] Member is Joining: akka.tcp://[email protected]:52459
[INFO] [06/26/2017 21:26:57.809] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:52459/user/eventListener] Member is Up: akka.tcp://[email protected]:52459

3、run "0" "2551"      //port=0代表由系统自动选择端口

[INFO] [06/26/2017 21:28:22.577] [run-main-1] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:52467] - Started up successfully
actor system started!
[INFO] [06/26/2017 21:28:22.736] [clusterSystem-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:52467] - Welcome from [akka.tcp://[email protected]:2551]
[INFO] [06/26/2017 21:28:22.747] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:52467/user/eventListener] Member is Up: akka.tcp://[email protected]:2551
[INFO] [06/26/2017 21:28:22.749] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:52467/user/eventListener] Member is Up: akka.tcp://[email protected]:52459
[INFO] [06/26/2017 21:28:22.749] [clusterSystem-akka.actor.default-dispatcher-16] [akka.tcp://[email protected]:52467/user/eventListener] Member is Joining: akka.tcp://[email protected]:52467
[INFO] [06/26/2017 21:28:24.611] [clusterSystem-akka.actor.default-dispatcher-22] [akka.tcp://[email protected]:52467/user/eventListener] Member is Up: akka.tcp://[email protected]:52467

在terminal2运算cluster.leave(cluster.selfAddress):

[INFO] [06/26/2017 22:40:47.614] [clusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/eventListener] Member is Leaving: akka.tcp://[email protected]:53986
[INFO] [06/26/2017 22:40:48.032] [clusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Leader is moving node [akka.tcp://[email protected]:53986] to [Exiting]
[INFO] [06/26/2017 22:40:48.032] [clusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://[email protected]:2551/user/eventListener] Member is Exiting: akka.tcp://[email protected]:53986
[INFO] [06/26/2017 22:40:48.047] [clusterSystem-akka.actor.default-dispatcher-21] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Exiting confirmed [akka.tcp://[email protected]:53986]
[INFO] [06/26/2017 22:40:49.033] [clusterSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://clusterSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Leader is removing confirmed Exiting node [akka.tcp://[email protected]:53986]
[INFO] [06/26/2017 22:40:49.033] [clusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://[email protected]:2551/user/eventListener] Member is Removed: akka.tcp://[email protected]:53986 after Exiting

下面就是本次示范的源代码:

resources/cluster.conf

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]
  }
}

ClusterEventsDemo.scala

import akka.actor._
import akka.cluster.ClusterEvent._
import akka.cluster._
import com.typesafe.config.ConfigFactory

class EventLisener extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,initialStateMode = InitialStateAsEvents
    ,classOf[MemberEvent],classOf[UnreachableMember])  //订阅集群状态转换信息
    super.preStart()
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)    //取消订阅
    super.postStop()
  }

  override def receive: Receive = {
    case MemberJoined(member) =>
      log.info("Member is Joining: {}", member.address)
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case MemberLeft(member) =>
      log.info("Member is Leaving: {}", member.address)
    case MemberExited(member) =>
      log.info("Member is Exiting: {}", member.address)
    case MemberRemoved(member, previousStatus) =>
      log.info(
        "Member is Removed: {} after {}",
        member.address, previousStatus)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
    case _: MemberEvent => // ignore
  }

}

object ClusterEventsDemo {
  def main(args: Array[String]): Unit = {
    //重设port,seed-node-address
    val port =
      if (args.isEmpty) "0"
      else args(0)

    val addr =
      if (args.length < 2) "2551"
      else args(1)


    val seednodeSetting = "akka.cluster.seed-nodes = ["+
    "\"akka.tcp://[email protected]:"+
      s"${addr}"+"\"]"



    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}")
      .withFallback(ConfigFactory.parseString(seednodeSetting))
      .withFallback(ConfigFactory.load("cluster.conf"))

    val clusterSystem = ActorSystem(name="clusterSystem",config=config)
    val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener")

    println("actor system started!")
    scala.io.StdIn.readLine()

    val cluster = Cluster(clusterSystem)
    cluster.leave(cluster.selfAddress)

    scala.io.StdIn.readLine()


    clusterSystem.terminate()


  }
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

以上是关于Akka(10): 分布式运算:集群-Cluster的主要内容,如果未能解决你的问题,请参考以下文章

Akka-CQRS- 基于akka-cluster的读写分离框架,构建gRPC移动应用后端架构

Akka 集群单例Cluster Singleton

Akka源码分析-Cluster-DistributedData

SDP:Streaming-Data-Processor - Data Processing with Akka-Stream

Akka源码分析-Cluster-ActorSystem

Scala实现Akka的并发与分布式