Spark RPC使用记录(spark-2.2.0)

Posted vv.past

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark RPC使用记录(spark-2.2.0)相关的知识,希望对你有一定的参考价值。

Spark RPC 使用说明

概述
        想通过 spark RPC 实现服务端则须实现
        ThreadSafeRpcEndpoint 或 RpcEndpoint
        一般通过实现前者来实现自己的服务,如同字面意思是线程安全的
        
        一般需要实现4个方法
            onStart                   服务启动时一些内部初始化和启动其他线程服务都在这里处理
            receive                   接收client发过来的请求,但是不需要回复
            receiveAndReply      接受client发过来的请求,并返回response
            onStop                   服务结束时需要做的一些清理动作在这里处理
Server端示例代码
package org.apache.spark

import java.text.SimpleDateFormat
import java.util.concurrent.ScheduledFuture
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv, ThreadSafeRpcEndpoint}

/**
  * Created by cloud on 18/1/18.
  */
class ZsparkRpcServer(
                     override val rpcEnv: RpcEnv,
                     val conf : SparkConf
                     ) extends ThreadSafeRpcEndpoint with Logging{

  val scheduledThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("echo-thread")
  var scheduledThreadFuture : ScheduledFuture[_] = _

  override def onStart(): Unit = {
    scheduledThreadFuture = scheduledThread.scheduleWithFixedDelay(new Runnable {
      override def run(): Unit = {
        val simpleTime = new SimpleDateFormat("yy-MM-dd HH:mm:ss")
        logInfo(simpleTime.format(new Date()))
      }
    },3000L,2000L,TimeUnit.MILLISECONDS)
  }

  override def receive: PartialFunction[Any, Unit] = {
    case ZRequest(command) => logInfo(command.toUpperCase)
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case ZRequest(command) => context.reply(ZResponse(command.reverse))
    case _ => context.reply(ZResponse("ECHO".reverse))
  }


  override def onStop(): Unit = {
    if(scheduledThreadFuture != null){
      scheduledThreadFuture.cancel(true)
    }
    scheduledThread.shutdownNow()
  }

}

object ZsparkRpcServer{
  val SN="z-cloud-echo"
  val EN="z-cloud-echo-ser"

  def main(args : Array[String]): Unit = {
    val conf = new SparkConf()
    val securityManager = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SN,Utils.localHostName(),23456,conf,securityManager)
    rpcEnv.setupEndpoint(EN,new ZsparkRpcServer(rpcEnv,conf))
    rpcEnv.awaitTermination()
  }
}

case class ZRequest(command : String)
case class ZResponse(result : String)
Client端示例代码

object ZsparkRpcClient{
  def main(args: Array[String]): Unit = {
    val host=Utils.localHostName()
    val port=23456
    val sparkConf = new SparkConf()
    val securityManager = new SecurityManager(sparkConf)
    val rpcEnv = RpcEnv.create(ZsparkRpcServer.SN,host,host,port,sparkConf,securityManager,true)
    val rpcEnvRef = rpcEnv.setupEndpointRef(RpcAddress(host,port),ZsparkRpcServer.EN)

    /*
    //异步获取response的方式
    val res=rpcEnvRef.ask[ZResponse](ZRequest("cli-echo"))
    res.onComplete {
      case Success(v) => println(v)
      case Failure(e) => println(e)
    }(ThreadUtils.sameThread)
    
    //发送不需要Server端回复的消息
    rpcEnvRef.send(ZRequest("non-response"))

    */
    
    /*
    * 同步获取response的方式
    * */
    for (i <- args){
      rpcEnvRef.send(ZRequest(i))
      val rpcTimeout=new RpcTimeout(FiniteDuration(3000L,TimeUnit.MILLISECONDS),"timeout")
      val res = rpcEnvRef.askSync[ZResponse](ZRequest(i),rpcTimeout)
      println(res.result)
    }

  }
}
启动RPC服务
#可以使用这种spark 封装脚本来执行,也可以自己构建执行环境
#使用spark封装脚本的好处就是处理简单可直接使用spark相关环境管理和配置管理,以及日志管理等等

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh"

exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.rpcDemo 1 "[email protected]"

以上是关于Spark RPC使用记录(spark-2.2.0)的主要内容,如果未能解决你的问题,请参考以下文章

StreamingListener记录(spark-2.2.0)

Spark2.1内部原理剖析与源码阅读程序设计与企业级应用案例

使用 Spark 2.2.0 从 Hive Metastore 2.x 读取 [重复]

Spark环境搭建

无法在 Spark-2.2.0 - Scala-2.11.8 上运行单元测试(scalatest)

Spark 2.2.0 安装与配置