03.Scala编程实战 Posted 2022-08-13 mediocreworld
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了03.Scala编程实战相关的知识,希望对你有一定的参考价值。
Scala编程实战
1. 课程目标
1.1. 目标:使用Akka实现一个简易版的spark通信框架
2. 项目概述
2.1. 需求
Hivesql----------> select count(*) from user----->整个表只有1条数据
Map 0% reduce 0%
Map 10% reduce 0%
Map 20% reduce 0%
目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量而设计的,在某些极端的情况下,任务提交的延迟很高,所以Hadoop的RPC显得有些笨重。
Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。
2.2. Akka简介
Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:
(1)、提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
(2)、提供了异步非阻塞的、高性能的事件驱动编程模型
(3)、超级轻量级事件处理(每GB堆内存几百万Actor)
3. 项目实现
3.1. 实战一:
利用 Akka 的 actor 编程模型,实现 2 个进程间的通信。
3.1.1. 架构图
3.1.2. 重要类介绍
ActorSystem : 在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
注意:
(1)、ActorSystem是一个进程中的老大,它负责创建和监督actor
(2)、ActorSystem是一个单例对象
(3)、actor负责通信
3.1.3. Actor
在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。
(1)preStart() 方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
(2)receive() 方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
3.1.4. 具体代码
① Master 类
package cn.itcast.rpc
import akka.actor.Actor, ActorRef, ActorSystem, Props
import com.typesafe.config.ConfigFactory
//todo: 利用akka的actor模型实现2个进程间的通信-----Master端
class Master extends Actor
//构造代码块先被执行
println ("master constructor invoked")
//prestart方法会在构造代码块执行后被调用,并且只被调用一次
override def preStart(): Unit =
println ("preStart method invoked")
//receive方法会在prestart方法执行后被调用,表示不断的接受消息
override def receive: Receive =
case "connect" =>
println ("a client connected")
//master发送注册成功信息给worker
sender ! "success"
object Master
def main(args: Array[String]): Unit =
//master的ip地址
val host=args(0)
//master的port端口
val port=args(1)
//准备配置文件信息
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$ host"
|akka.remote.netty.tcp.port = "$ port"
""".stripMargin
//配置config对象 利用ConfigFactory解析配置文件,获取配置信息
val config=ConfigFactory.parseString (configStr)
// 1、创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象
val masterActorSystem = ActorSystem ("masterActorSystem",config)
// 2、通过ActorSystem来创建master actor
val masterActor: ActorRef = masterActorSystem.actorOf(Props (new Master),"masterActor")
// 3、向master actor发送消息
//masterActor ! "connect"
② Worker 类
package cn.itcast.rpc
import akka.actor.Actor, ActorRef, ActorSelection, ActorSystem, Props
import com.typesafe.config.ConfigFactory
//todo: 利用akka中的actor实现2个进程间的通信-----Worker端
class Worker extends Actor
println ("Worker constructor invoked")
//prestart方法会在构造代码块之后被调用,并且只会被调用一次
override def preStart(): Unit =
println ("preStart method invoked")
//获取master actor的引用
//ActorContext全局变量,可以通过在已经存在的actor中,寻找目标actor
//调用对应actorSelection方法,
// 方法需要一个path路径:1、通信协议、2、master的IP地址、3、master的端口 4、创建master actor老大 5、actor层级
val master: ActorSelection = context .actorSelection("akka.tcp://masterActorSystem@172.16.43.63:8888/user/masterActor")
//向master发送消息
master ! "connect"
//receive方法会在prestart方法执行后被调用,不断的接受消息
override def receive: Receive =
case "connect" =>
println ("a client connected")
case "success" =>
println ("注册成功")
object Worker
def main(args: Array[String]): Unit =
//定义worker的IP地址
val host=args(0)
//定义worker的端口
val port=args(1)
//准备配置文件
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$ host"
|akka.remote.netty.tcp.port = "$ port"
""".stripMargin
//通过configFactory来解析配置信息
val config=ConfigFactory.parseString (configStr)
// 1、创建ActorSystem,它是整个进程中的老大,它负责创建和监督actor
val workerActorSystem = ActorSystem ("workerActorSystem",config)
// 2、通过actorSystem来创建 worker actor
val workerActor: ActorRef = workerActorSystem.actorOf(Props (new Worker),"workerActor")
//向worker actor发送消息
workerActor ! "connect"
3.2. 实战二
使用 Akka 实现一个简易版的 spark 通信框架
3.2.1. 架构图
3.2.2. 具体代码
① Master 类
package cn.itcast.spark
import akka.actor.Actor, ActorRef, ActorSystem, Props
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
//todo: 利用akka实现简易版的spark通信框架-----Master端
class Master extends Actor
//构造代码块先被执行
println ("master constructor invoked")
//定义一个map集合,用于存放worker信息
private val workerMap = new mutable.HashMap[String,WorkerInfo]()
//定义一个list集合,用于存放WorkerInfo信息,方便后期按照worker上的资源进行排序
private val workerList = new ListBuffer[WorkerInfo]
//master定时检查的时间间隔
val CHECK_OUT_TIME_INTERVAL =15000 //15秒
//prestart方法会在构造代码块执行后被调用,并且只被调用一次
override def preStart(): Unit =
println ("preStart method invoked")
//master定时检查超时的worker
//需要手动导入隐式转换
import context .dispatcher
context .system.scheduler.schedule(0 millis,CHECK_OUT_TIME_INTERVAL millis,self ,CheckOutTime)
//receive方法会在prestart方法执行后被调用,表示不断的接受消息
override def receive: Receive =
//master接受worker的注册信息
case RegisterMessage (workerId,memory,cores) =>
//判断当前worker是否已经注册
if (!workerMap .contains(workerId))
//保存信息到map集合中
val workerInfo = new WorkerInfo(workerId,memory,cores)
workerMap .put(workerId,workerInfo)
//保存workerinfo到list集合中
workerList +=workerInfo
//master反馈注册成功给worker
sender ! RegisteredMessage (s"workerId:$ workerId 注册成功")
//master接受worker的心跳信息
case SendHeartBeat (workerId)=>
//判断worker是否已经注册,master只接受已经注册过的worker的心跳信息
if (workerMap .contains(workerId))
//获取workerinfo信息
val workerInfo: WorkerInfo = workerMap (workerId)
//获取当前系统时间
val lastTime: Long = System.currentTimeMillis ()
workerInfo.lastHeartBeatTime =lastTime
case CheckOutTime=>
//过滤出超时的worker 判断逻辑: 获取当前系统时间 - worker上一次心跳时间 >master定时检查的时间间隔
val outTimeWorkers: ListBuffer[WorkerInfo] = workerList .filter(x => System.currentTimeMillis () -x.lastHeartBeatTime > CHECK_OUT_TIME_INTERVAL )
//遍历超时的worker信息,然后移除掉超时的worker
for (workerInfo <- outTimeWorkers)
//获取workerid
val workerId: String = workerInfo.workerId
//从map集合中移除掉超时的worker信息
workerMap .remove(workerId)
//从list集合中移除掉超时的workerInfo信息
workerList -= workerInfo
println ("超时的workerId:" +workerId)
println ("活着的worker总数:" + workerList .size)
//master按照worker内存大小进行降序排列
println (workerList .sortBy(x => x.memory).reverse.toList)
object Master
def main(args: Array[String]): Unit =
//master的ip地址
val host=args(0)
//master的port端口
val port=args(1)
//准备配置文件信息
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$ host"
|akka.remote.netty.tcp.port = "$ port"
""".stripMargin
//配置config对象 利用ConfigFactory解析配置文件,获取配置信息
val config=ConfigFactory.parseString (configStr)
// 1、创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象
val masterActorSystem = ActorSystem ("masterActorSystem",config)
// 2、通过ActorSystem来创建master actor
val masterActor: ActorRef = masterActorSystem.actorOf(Props (new Master),"masterActor")
// 3、向master actor发送消息
//masterActor ! "connect"
② Worker 类
package cn.itcast.spark
import java.util.UUID
import akka.actor.Actor, ActorRef, ActorSelection, ActorSystem, Props
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
//todo: 利用akka实现简易版的spark通信框架-----Worker端
class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor
println ("Worker constructor invoked")
//定义workerId
private val workerId : String = UUID.randomUUID ().toString
//定义发送心跳的时间间隔
val SEND_HEART_HEAT_INTERVAL =10000 //10秒
//定义全局变量
var master : ActorSelection=_
//prestart方法会在构造代码块之后被调用,并且只会被调用一次
override def preStart(): Unit =
println ("preStart method invoked")
//获取master actor的引用
//ActorContext全局变量,可以通过在已经存在的actor中,寻找目标actor
//调用对应actorSelection方法,
// 方法需要一个path路径:1、通信协议、2、master的IP地址、3、master的端口 4、创建master actor老大 5、actor层级
master = context .actorSelection(s"akka.tcp://masterActorSystem@$ masterHost:$ masterPort/user/masterActor")
//向master发送注册信息,将信息封装在样例类中,主要包含:workerId,memory,cores
master ! RegisterMessage (workerId ,memory,cores)
//receive方法会在prestart方法执行后被调用,不断的接受消息
override def receive: Receive =
//worker接受master的反馈信息
case RegisteredMessage (message) =>
println (message)
//向master定期的发送心跳
//worker先自己给自己发送心跳
//需要手动导入隐式转换
import context .dispatcher
context .system.scheduler.schedule(0 millis,SEND_HEART_HEAT_INTERVAL millis,self ,HeartBeat)
//worker接受心跳
case HeartBeat =>
//这个时候才是真正向master发送心跳
master ! SendHeartBeat (workerId )
object Worker
def main(args: Array[String]): Unit =
//定义worker的IP地址
val host=args(0)
//定义worker的端口
val port=args(1)
//定义worker的内存
val memory=args(2).toInt
//定义worker的核数
val cores=args(3).toInt
//定义master的ip地址
val masterHost=args(4)
//定义master的端口
val masterPort=args(5)
//准备配置文件
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$ host"
|akka.remote.netty.tcp.port = "$ port"
""".stripMargin
//通过configFactory来解析配置信息
val config=ConfigFactory.parseString (configStr)
// 1、创建ActorSystem,它是整个进程中的老大,它负责创建和监督actor
val workerActorSystem = ActorSystem ("workerActorSystem",config)
// 2、通过actorSystem来创建 worker actor
val workerActor: ActorRef = workerActorSystem.actorOf(Props (new Worker(memory,cores,masterHost,masterPort)),"workerActor")
//向worker actor发送消息
workerActor ! "connect"
③ WorkerInfo 类
package cn.itcast.spark
//封装worker信息
class WorkerInfo(val workerId:String,val memory:Int,val cores:Int)
//定义一个变量用于存放worker上一次心跳时间
var lastHeartBeatTime :Long=_
override def toString: String =
s"workerId:$ workerId , memory:$ memory , cores:$ cores"
④ 样例类
package cn.itcast.spark
trait RemoteMessage extends Serializable
//worker向master发送注册信息,由于不在同一进程中,需要实现序列化
case class RegisterMessage(val workerId:String,val memory:Int,val cores:Int) extends RemoteMessage
//master反馈注册成功信息给worker,由于不在同一进程中,也需要实现序列化
case class RegisteredMessage(message:String) extends RemoteMessage
//worker向worker发送心跳 由于在同一进程中,不需要实现序列化
case object HeartBeat
//worker向master发送心跳,由于不在同一进程中,需要实现序列化
case class SendHeartBeat(val workerId:String) extends RemoteMessage
//master自己向自己发送消息,由于在同一进程中,不需要实现序列化
case object CheckOutTime
以上是关于03.Scala编程实战的主要内容,如果未能解决你的问题,请参考以下文章
Shell脚本编程实战
Java并发编程实战 04死锁了怎么办?
Java并发编程实战 04死锁了怎么办?
树莓派Python编程入门与实战 PDF
SHELL编程开发实战Apache虚拟主机
gconf/dconf实战编程利用dconf库读写配置实战以及诸多配套工具演示