Scala:基础知识04

Posted Xiao Miao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Scala:基础知识04相关的知识,希望对你有一定的参考价值。

Scala基础知识

一、scala高阶函数

函数理解

  • 在FP编程语言中,函数是”头等公民“。

  • 函数的本质都是对象,都是FunctionN的实例。N表示的是函数入参的个数。

  • 可以把函数当成和String、Int对象一样,作为参数传递给方法。

m1(x:Int,y:Int)  //需要两个参数 类型都为Int 调用的时候传入两个Int m1(2,3)

m2(p:(Int) =>Int) //需要一个参数 类型是函数式类型  

1.把函数作为值传递给方法

在这里插入图片描述

2.匿名函数

在这里插入图片描述

3.柯里化函数

定义:将原先接受多个参数的方法转换为多个只有一个参数的参数列表的过程

  • 功能意义:实现分步调用函数 提高代码的灵活性
  • 从现象上看,柯里化有多个参数列表 有多个小括号。

在这里插入图片描述

4.闭包函数

  • 定义:闭包首先是个函数 只不过函数的返回值依赖于声明在函数外部的变量
  • 通俗:在定义函数的时候 对外部自由变量的捕获过程

在这里插入图片描述

二、scala隐式转换

隐式转换的引入

object ImplicitCase01 {
  def main(args: Array[String]): Unit = {
    //有一个方法 可以返回一个含头又含尾的区间 Range
    val a:Int = 0

    println(a to 5)  //中缀调用形式
    println(a.to(5)) //后缀调用形式

    //TODO to方法属于Int类型 ,但是Int的源码中又木有发现 怎么解释这种现象?
    // 直接点击to方法 发现进入的RichInt类  怎么解释?
    // 猜想 :scala背后捣鬼了 不知道在什么时候将Int转换成了RichInt  ---->隐式转换
  }
}

隐式转换

  • 从字面上理解,就是scala编译器背后进行偷偷转换动作。

  • 本质是scala编译器提供的一种代码纠错的功能。在下面的两种情况下,程序代码肯定是报错

    • 1、类调用一个不属于自己的方法 调用其他类的方法
    • 2、调用方法时不给参数
  • 但是,如果有了隐式转换,在编译期间,scala会通过隐式转换让程序可以继续执行 实现纠错。

  • 关键字眼:定义类 定义方法 定义参数 使用implicit

  • 核心用法

    • 隐式参数 – 解决:调用方法时不给参数

在这里插入图片描述隐式方法

  • 实现类型转换
    在这里插入图片描述- 调用不属于自己的方法 调用其他类的方法

/**
 *  todo 隐式转换的方法必须定义在object中 而不是class
 *  主要原因object中方法和属性相当于是静态的 可以直接调用
 */
object DogImplicit {
//定义个隐式转换方法 让dog变成RichDog
  implicit def dogWrapper(dog: Dog) = new RichDog
}


object ImplicitCase02 {
  def main(args: Array[String]): Unit = {
  //todo 这里需要隐式转换
    import com.miao.implicitdemo.DogImplicit.dogWrapper
    val dog = new Dog
    dog.canLearnSkill("钓鱼")

  }
}

class Dog {}

class RichDog {
  def canLearnSkill(skillName: String) = println(s"狗狗学会了${skillName}")

}

隐式类

import java.io.File
import scala.io.Source

object Implicit_Class {
  //隐式类
  implicit class ImpInt(tmp: Int) {
    def add(tmp2: Int) = tmp + tmp2
  }
  implicit class FileEnhance(file: File) {
    def read() = Source.fromFile(file.getPath).mkString
  }

}


object Test {
  import com.miao.implicitdemo.Implicit_Class._
  def main(args: Array[String]) {
    println(1.add(2)) //在当前作用域中寻找 将Int(1) 作为变量的类同时具有add 的方法的类,如有,则执行
    println(new File("D:\\\\Study\\\\idea\\\\Scala\\\\files\\\\1.txt").read()) //在当前作用域中寻找 将File 作为变量的类同时具有read的方法的类,如有,则执行
  }
}


隐式转换的使用注意事项

  • 无歧义原则
    在这里插入图片描述

  • 显示操作先行原则
    在这里插入图片描述
    集中定义原则

  • 通常把隐式转换参数、隐式转换方法集中定义在object中

  • 在具体的工程中,哪里需要隐式转换,哪里手动导入

三、AKKA框架

  • AKKA和actor的关系
    • actor是一种并发编程通信的模型:基于actor的消息发送和接收
    • AKKA是基于actor模型的一个RPC框架,实现进程间、跨网络通信。
  • RPC在大数据领域的应用
    • Hadoop 自己封装了RPC框架 HadoopRPC
    • Spark 1.6- AKKA 1.6+ Netty
    • Flink AKKA
  • 重点:倒不是设计spark软件 练习scala编程语言,掌握理解分布式通信思想。
  • AKKA特点
    • ActorSystem 创建监督管理actor的核心类
    • 意味着在akka中,不能自己new 创建actor,必须通过ActorSystem创建Actor.
  • java程序打成jar包如何执行。
    • java -jar xxx.jar [mainClass] args
  • 案例1:使用AKKA实现两个进程间的通信。
  • 案例2:使用AKKA实现两个进程间的通信:注册 保存注册消息 心跳 心跳超时检查

案例1

  • 相关依赖
<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • Master
package com.miao.akka

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

/**
 * @description: todo 使用akka实现两个进程间的通信 ---master端
 * @author: Xiao Miao
 * @time: 2021-06-12 09:53:22
 */
class Master extends Actor{

  println("Master主构造器执行了...")

  //preStart是初始化方法 在 构造器执行之后 receive执行之前 执行且执行一次
  override def preStart(): Unit = {
    println("Master的初始化方法执行了...")
  }

  //receive方法是akka中actor不断接受消息处理消息的方法  不需要用户再使用while控制
  override def receive: Receive ={
    //接收worker注册信息
    case "hello" => {
      println("a client connected......")
      //返回注册成功信息
      sender ! "ok"
    }
  }
}

object Master{
  def main(args: Array[String]): Unit ={

    //主机ip 端口
    val host =args(0)
    val port =args(1)

    //配置参数
    val configStr: String =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "$host"
        |akka.remote.netty.tcp.port = "$port"
        |""".stripMargin
    //解析配置参数
    val config: Config = ConfigFactory.parseString(configStr)

    //首先创建ActorSystem
    val masterActorSystem: ActorSystem = ActorSystem.create("masterActorSystem", config)

    //由ActorSystem去创建MasterActor
    val master: ActorRef = masterActorSystem.actorOf(Props(new Master), "masterActor")

    //测试  给自己发个消息
//    master ! "hello"
  }
}
  • Worker
package com.miao.akka

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

/**
 * @description: todo 使用akka实现两个进程间的通信 ---worker端
 * @author: Xiao Miao
 * @time: 2021-06-12 09:53:22
 */
class Worker extends Actor{

  override def preStart(): Unit = {
//    println("如果你需要 你就进行一些初始化的操作")
    //todo 在worker启动的初始化过程中 想法设法拿到master引用 向其发送注册信息
    // 如何在茫茫网络程序中 找到master运行的地方呢?
    // (协议  ip  端口  actorSystem masterActor 层级关系)
    //  akka.tcp://masterActorSystem@127.0.0.1:12321
    // AKKA提供了一个上下文对象context 可以根据URI地址找到需要通信的actor
    val master: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:12321/user/masterActor")
    //worker向master发送注册信息
    master !  "hello"
  }

  override def receive: Receive = {
    //接收注册成功信息 处理
    case "ok" => println("注册成功")
  }
}

object Worker{
  def main(args: Array[String]): Unit = {

    val host =args(0)
    val port =args(1)

    //1、配置参数
    val configStr:String =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "$host"
        |akka.remote.netty.tcp.port = "$port"
        |""".stripMargin
    //2、解析配置参数
    val config: Config = ConfigFactory.parseString(configStr)

    //3、创建ActorSystem
    val workerActorSystem: ActorSystem = ActorSystem.create("workerActorSystem", config)

    //4、创建workerActor
    val worker: ActorRef = workerActorSystem.actorOf(Props(new Worker), "workerActor")

//    worker ! "hello"
  }
}

案例2

  • Master
package com.miao.spark

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
//todo 需要进行时间单位的操作时候 导入该包
import scala.concurrent.duration._

/**
 * @description:  todo 使用AKKA实现spark进程间的简易通信 master端
 * @author: Xiao Miao
 * @time: 2021-06-12 09:53:22
 */
class Master extends Actor{

  //定义个集合 保存worker注册信息
  //  <workerID,WorkerInfo>
  private val workerInfoMap = new mutable.HashMap[String, WorkerInfo]()
  //定义个集合 保存worker资源信息  便于后续根据资源大小排序  业务问题
  private val workerInfoList = new ListBuffer[WorkerInfo]

  //preStart是初始化方法 在 构造器执行之后 receive执行之前 执行且执行一次
  override def preStart(): Unit = {
//    println("Master的初始化方法执行了...")
    //todo master启动之后 开始心跳超时检测的功能
    /**
     * schedule(什么时间首次,间隔时间下次,发给谁,发什么) 定时功能
     * schedule(立即0s, 间隔10s, self ,CheckTimeOut)
     */
    import context.dispatcher
    context.system.scheduler.schedule(0 seconds,10 seconds){
      self ! CheckTimeOut
    }
  }

  //receive方法是akka中actor不断接受消息处理消息的方法  不需要用户再使用while控制
  override def receive: Receive ={
    //接收worker注册信息
    case RegisterMessage(workerID,memory,cores) =>{
      //判断worker是否已经注册 如果未注册 保存注册信息
      if(!workerInfoMap.contains(workerID)){
        //构造workerInfo
        val info = new WorkerInfo(workerID, memory, cores)
        //添加至workerInfoMap中
        workerInfoMap.put(workerID,info)
        workerInfoList += info

        //给worker发送注册成功的信息
        println(s"编号ID为:${workerID}的worker,注册成功")
        sender ! RegisterSuccess(s"恭喜你:${workerID},注册成功,记得后续积极心跳哦")
      }
    }

      //接收worker的心跳信息
    case HeartBeat(workerID) =>{
      //判断该worker是否已经注册 如果注册 就把当前时间更新为上次心跳时间
      if(workerInfoMap.contains(workerID)){
        val info: WorkerInfo = workerInfoMap(workerID)
        //获取当前时间
        val nowTime: Long = System.currentTimeMillis()
        //把当前时间更新为该worker上次心跳时间
        info.lastHeartBeatTime = nowTime
      }
    }

      //用于接收自己的信息 进行心跳超时检测
    case CheckTimeOut =>{
      //todo 系统当前时间 - 上次心跳时间 > 7s 超时
      val outTime: ListBuffer[WorkerInfo] = workerInfoList.filter(w => System.currentTimeMillis() - w.lastHeartBeatTime > 7 * 1000)
      //遍历超时 剔除掉
      for(o <- outTime){
        println(s"超时的worker是:${o.workerID}")
        //把超时的worker从集合中剔除
        workerInfoMap.remove(o.workerID)
        workerInfoList -= o
      }

      //打印一些信息
      println(s"当前存活的worker的个数是:${workerInfoList.size}")
      //根据memory的大小倒序进行排序
      println(workerInfoList.sortBy(w  => w.memory).reverse)
      println("-----------------------------------------")
    }
  }
}

object Master{
  def main(args: Array[String]): Unit ={

    //主机ip 端口
    val host =args(0)
    val port =args(1)

    //配置参数
    val configStr: String =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
         |""".stripMargin
    //解析配置参数
    val config: Config = ConfigFactory.parseString(configStr)

    //首先创建ActorSystem
    val masterActorSystem: ActorSystem = ActorSystem.create("masterActorSystem", config)

    //由ActorSystem去创建MasterActor
    val master: ActorRef = masterActorSystem.actorOf(Props(new Master), "masterActor")

  }
}
  • Worker
package com.miao.spark

import java.util.UUID
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
//todo 需要进行时间单位的操作时候 导入该包
import scala.concurrent.duration._

/**
 * @description:  todo 使用AKKA实现spark进程间的简易通信 worker端
 * @author: Xiao Miao
 * @time: 2021-06-12 09:53:22
 */

class Worker(var memory:Int,var cores:Int) extends Actor{

  //workerID
  private val workerID: String = UUID.randomUUID().toString.replaceAll("-", "")

  var master: ActorSelection =_

  override def preStart(): Unit = {
    //todo 在worker启动的初始化过程中 想法设法拿到master引用 向其发送注册信息
    // 如何在茫茫网络程序中 找到master运行的地方呢?
    // (协议  ip  端口  actorSystem masterActor 层级关系)
    //  akka.tcp://masterActorSystem@127.0.0.1:12321
    // AKKA提供了一个上下文对象context 可以根据URI地址找到需要通信的actor
    master = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:12321/user/masterActor")
    //worker向master发送注册信息
    master !  RegisterMessage(workerID, memory, cores)
  }

  override def receive: Receive = {
    //接收注册成功信息 处理
    case RegisterSuccess(msg) =>{
      println(msg)
      //todo worker注册成功之后立即开始首次心跳 然后后续指定间隔时间重复心跳 本需求中约定5s钟心跳一次
      // akka如何实现定时周期重复动作
      /**
       * schedule(什么时间首次,间隔时间下次,发给谁,发什么) 定时功能
       * schedule(立即0s, 间隔5s, master,workerID)
       */
      import context.dispatcher
      context.system.scheduler.schedule(0 seconds,5 seconds){
        //给master发送心跳信息
        master ! HeartBeat(workerID)
      }
    }
  }
}

object Worker{
  def main(args: Array[String]): Unit = {

    val host =args(0)
    val port =args(1)

    val memory =args(2).toInt
    val cores = args(3).toInt

    //1、配置参数
    val configStr:String =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
         |""".stripMargin
    //2、解析配置参数
    val config: Config = ConfigFactory.parseString(configStr)

    //3、创建ActorSystem
    val workerActorSystem: ActorSystem = ActorSystem.create("workerActorSystem", config)

    //4、创建workerActor
    val worker: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory,cores)), "workerActor")

  }
}
  • SparkRemote
package com.miao.spark

/**
 * @description:
 * @author: Xiao Miao
 * @time: 2021-06-12 09:53:22
 */
trait SparkRemote extends Serializable {
    //根据需求集中定义一些共用的方法
}

//定义样例类 用于worker向master发送注册信息
case class RegisterMessage(workerID:String,memory:Int,cores:Int) extends SparkRemote
//定义样例类 用于master给worker发送注册成功信息
case class RegisterSuccess(msg:String) extends SparkRemote
//定义样例类 用于worker给master发送心跳信息
case class HeartBeat(workerID:String) extends SparkRemote
//定义样例对象 用于master自己给自己发送心跳超时检测信息
case object CheckTimeOut
  • WorkerInfo
package com.miao.spark

/**
 * @description:
 * @author: Xiao Miao
 * @time: 2021-06-12 09:53:22
 */
//定义一个普通的scalabean 用于保存每个worker的注册信息 及其它的资源信息
class WorkerInfo(var workerID:String,var memory:Int,var cores:Int) {
  //记录上次心跳时间
  var lastHeartBeatTime:Long =_
  //重写对象的toString方法

  override def toString = s"WorkerInfo($workerID, $memory, $cores)"
}

以上是关于Scala:基础知识04的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala 中使用 java.String.format?

Spark基础学习笔记04:Scala简介与安装

Scala语言专题

为什么Scala是可扩展的?

初学scala4——trait混入

Scala附加列表