《Scala 语言》Scala 中的 Actor 编程

Posted 飞鱼说编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Scala 语言》Scala 中的 Actor 编程相关的知识,希望对你有一定的参考价值。


1.了解 Scala Actor


注:Scala Actor 是 Scala 2.10.x 版本及以前版本的 Actor。
Scala 在 2.11.x 版本中将 Akka 加入其中,作为其默认的 Actor,老版本的 Actor 已经废弃。


1.1. 概念


Scala 中的 Actor 能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala 是运用消息 (message) 的发送、接收来实现多线程的。使用 Scala 能够更容易地实现多线程应用的开发。


1.2. 对比传统的 Java 并发编程和 Scala Actor 编程


《Scala 语言》Scala 中的 Actor 编程


对于 Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用 synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的 try…catch 语句块中加上 wait 方法、notify 方法、notifyAll 方法是让人很头疼的。原因就在于 Java 中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。 而在 Scala 中,我们可以通过复制不可变状态的资源 (即对象,Scala 中一切都是对象,连函数、方法也是) 的一个副本,再基于 Actor 的消息发送、接收机制进行并行编程。


1.3. Actor 方法执行顺序


(1) 首先调用 start() 方法启动 Actor;

(2) 调用 start() 方法后其 act() 方法会被执行;

(3) 向 Actor 发送消息。


1.4. 发送消息的方式


《Scala 语言》Scala 中的 Actor 编程


2.Actor 实战


2.1. 案例一


import scala.actors.Actor

object MyActor1 extends Actor{
 //定义act方法
 def act(){
   for(i <- 1 to 10){
     println("actor-1 " + i)
     Thread.sleep(2000)
   }
 }
}

object MyActor2 extends Actor{
 //定义act方法
 def act(){
   for(i <- 1 to 10){
     println("actor-2 " + i)
     Thread.sleep(2000)
   }
 }
}

object ActorExample1 extends App{
 //启动Actor
 MyActor1.start()
 MyActor2.start()
}


说明:上面分别调用了两个单例对象的 start() 方法,他们的 act() 方法会被执行,相同与在 Java 中开启了两个线程,线程的 run() 方法会被执行。

注意:这两个 Actor 是并行执行的,act() 方法中的 for 循环执行完成后 actor 程序就退出了。


2.2. 案例二


可以不断地接收消息。


import scala.actors.Actor

class ActorExample2 extends Actor {

 override def act(): Unit = {
   while (true) {
     receive {
       case "start" => {
         println("starting ...")
         Thread.sleep(5000)
         println("started")
       }
       case "stop" => {
         println("stopping ...")
         Thread.sleep(5000)
         println("stopped ...")
       }
     }
   }
 }
}

object ActorExample2 {
 def main(args: Array[String]) {
   val actor = new ActorExample2
   actor.start()
   //发送异步消息,感叹号就相当于是一个方法
   actor ! "start"
   actor ! "stop"
   println("消息发送完成!")
 }
}


说明:在 act() 方法中加入了while (true) 循环,就可以不停的接收消息。

注意:发送 start 消息和 stop 的消息是异步的,但是 Actor 接收到消息执行的过程是同步的按顺序执行。


2.3. 案例三


react 方式会复用线程,比 receive 更高效。


import scala.actors.Actor

class ActorExample3 extends Actor {

 override def act(): Unit = {
   loop {
     react {
       case "start" => {
         println("starting ...")
         Thread.sleep(5000)
         println("started")
       }
       case "stop" => {
         println("stopping ...")
         Thread.sleep(8000)
         println("stopped ...")
       }
     }
   }
 }
}

object ActorExample3 {
 def main(args: Array[String]) {
   val actor = new ActorExample3
   actor.start()
   actor ! "start"
   actor ! "stop"
   println("消息发送完成!")
 }
}


说明: react 如果要反复执行消息处理,react 外层要用 loop,不能用 while。


2.4. 案例四


结合 case class 发送消息。


import scala.actors.Actor

class ActorExample4 extends Actor {

 def act(): Unit = {
   while (true) {
     receive {
       case "start" => println("starting ...")
       case SyncMsg(id, msg) => {
         println(id + ",sync " + msg)
         Thread.sleep(5000)
         sender ! ReplyMsg(3,"finished")
       }
       case AsyncMsg(id, msg) => {
         println(id + ",async " + msg)
         Thread.sleep(5000)
       }
     }
   }
 }
}

object ActorExample4 {
 def main(args: Array[String]) {
   val a = new ActorExample4
   a.start()
   //异步消息
   a ! AsyncMsg(1, "hello actor")
   println("异步消息发送完成")
   //同步消息
   //val content = a.!?(1000, SyncMsg(2, "hello actor"))
   //println(content)
   val reply = a !! SyncMsg(2, "hello actor")
   println(reply.isSet)
   //println("123")
   val c = reply.apply()
   println(reply.isSet)
   println(c)
 }
}
case class SyncMsg(id : Int, msg: String)
case class AsyncMsg(id : Int, msg: String)
case class ReplyMsg(id : Int, msg: String)


2.5. 案例五


用 actor 并发编程写一个单机版的 WorldCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果。


import java.io.File

import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source

/**
 * Created by ZX on 2016/4/4.
 */

class Task extends Actor {

 override def act(): Unit = {
   loop {
     react {
       case SubmitTask(fileName) => {
         val contents = Source.fromFile(new File(fileName)).mkString
         val arr = contents.split("\r\n")
         val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
         //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
         sender ! ResultTask(result)
       }
       case StopTask => {
         exit()
       }
     }
   }
 }
}

object WorkCount {
 def main(args: Array[String]) {
   val files = Array("c://words.txt", "c://words.log")

   val replaySet = new mutable.HashSet[Future[Any]]
   val resultList = new mutable.ListBuffer[ResultTask]

   for(f <- files) {
     val t = new Task
     val replay = t.start() !! SubmitTask(f)
     replaySet += replay
   }

   while(replaySet.size > 0){
     val toCumpute = replaySet.filter(_.isSet)
     for(r <- toCumpute){
       val result = r.apply()
       resultList += result.asInstanceOf[ResultTask]
       replaySet.remove(r)
     }
     Thread.sleep(100)
   }
   val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
   println(finalResult)
 }
}

case class SubmitTask(fileName: String)
case object StopTask
case class ResultTask(result: Map[String, Int])


本文是在本人在学习 Scala 时的总结归纳和笔记,如果觉得对你有帮助,不要忘了点赞,评论,转发哟!!!

可以点击阅读原文查看博客原文哦!

上一篇:


以上是关于《Scala 语言》Scala 中的 Actor 编程的主要内容,如果未能解决你的问题,请参考以下文章

[scala] akka actor编程

scala当中的Actor并发编程

scala中的Actor

Scala学习之路 Scala的Actor

Scala的actor

第4节 Scala中的actor介绍:1actor概念介绍;2actor执行顺序和发送消息的方式