scala Actor -03
Posted 蜗牛不爱海绵宝宝
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala Actor -03相关的知识,希望对你有一定的参考价值。
1.对于上一篇讲解的scala的一些补充
val files = Array[String]("a.txt","b.txt","c.txt")
for(f <- files){xxxx}
目标一:熟悉Scala Actor并发编程
目标二:为学习Akka做准备
注:我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor。
Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,
老版本的Actor已经废弃
2.概念
Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的
并发机制,
Scala是运用消息(message)的发送、接收来实现多线程的。
使用Scala能够更容易地实现多线程应用的开发
3.Actor方法执行顺序
1.首先调用start()方法执行Actor
2.调用start()方法后其act()方法会被执行
3.向Actor发送消息
4.wordCount的Actor的计算方法,虽然现在不用,但是思路还是有用的
package main.cn.wj.test import scala.actors.{Actor, Future} import scala.collection.immutable.HashSet import scala.io.Source import scala.collection.mutable.ListBuffer /** * Created by WJ on 2016/12/22. */ class Task extends Actor{ override def act(): Unit = { loop{ react{ case SubmitTask(filename) =>{ val result = Source.fromFile(filename).getLines().flatMap(_.split(" ")).map((_,1)).toList.groupBy(_._1).mapValues(_.size) sender ! ResultTask(result) } case StopTask =>{ exit() } } } } } case class SubmitTask(filename:String) case class ResultTask (result:Map[String,Int]) case object StopTask object ActorWordCount { def main(args: Array[String]): Unit = { var replySet = new HashSet[Future[Any]]() val resultList = new ListBuffer[ResultTask] val files = Array[String]("E://Test/words.log", "E://Test/words.txt") for (f <- files) { val actor = new Task val reply = actor.start() !! SubmitTask(f) //<reply 等同于Future> replySet += reply } while(replySet.size > 0 ){ val toCompute = replySet.filter(_.isSet) for(f <- toCompute) { val result = f.apply().asInstanceOf[ResultTask] resultList += result replySet -= f } Thread.sleep(100) } // reduce功能 ,汇总 //List val fr = resultList.flatMap(_.result).groupBy((_._1)).mapValues(_.foldLeft(0)(_+_._2)) println(fr) } }
5.看了上面的关于多线程相关的知识点,看看我们的线程池的代码
package main.cn.wj.test import java.util.concurrent.{Executor, Executors} /** * Created by WJ on 2016/12/22. */ object ThreadDemo { def main(args: Array[String]): Unit = { val pool = Executors.newFixedThreadPool(5); for (i <- 1 to 10){ pool.execute(new Runnable { override def run(): Unit = { println(Thread.currentThread().getName) Thread.sleep(1000) } }) } } }
以上是关于scala Actor -03的主要内容,如果未能解决你的问题,请参考以下文章