Spark Shuffle服务和客户端

Posted 大冰的小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Shuffle服务和客户端相关的知识,希望对你有一定的参考价值。

BlockTransferService

Spark是分布式部署的,每个Task最终都运行在不同的机器节点上。map任务的输出结果直接存储到map任务所在的机器的存储体系中,reduce任务很可能不在同一台机器上运行,所以需要远程下载map任务的中间输出。ShuffleClient不仅将shuffle文件上传到其他Executor或者下载到本地的客户端,也提供了可以被其他Executor访问的shuffle服务。
在BlockManager中创建ShuffleClinet代码如下:

  // Client to read other executors' shuffle files. This is either an external service, or just the
  // standard BlockTransferService to directly connect to other Executors.
  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) 
    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
      securityManager.isSaslEncryptionEnabled())
   else 
    blockTransferService
  

从上面的代码中可以到,当有外部的ShuffleClient时,新建ExternalShuffleClient,否则默认为BlockTransferService。BlockTransferService必须在调用init方法后才能提供服务。BlockTransferService是一个抽象类,实现它的子类是NettyBlockTransferService,其初始化步骤:
1. 创建RpcServer(实际是其子类NettyBlockRpcServer);
2. 创建TransportContext;
3. 创建Rpc 客户端工厂 TransportClientFactory;
4. 创建Netty服务器 TransportServer,可以修改属性spark.blockManager.port改变TransportServer的端口

  override def init(blockDataManager: BlockDataManager): Unit = 
    val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
    var serverBootstrap: Option[TransportServerBootstrap] = None
    var clientBootstrap: Option[TransportClientBootstrap] = None
    if (authEnabled) 
      serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager))
      clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager,
        securityManager.isSaslEncryptionEnabled()))
    
    transportContext = new TransportContext(transportConf, rpcHandler)
    clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
    server = createServer(serverBootstrap.toList)
    appId = conf.getAppId
    logInfo("Server created on " + server.getPort)
  

1 Block的RPC服务 NettyBlockRpcServer

当map任务与reduce任务处于不同的节点是,reduce任务需要从远端节点下载map任务的中间输出,因此NettyBlockRpcServer提供打开(OpenBlocks),即下载Block文件的功能;一些情况下,为了容错,需要将Block的数据备份到其他节点上,所以NettyBlockRpcServer还提供了上传(UploadBlock)Block文件的RPC服务。

/**
 * Serves requests to open blocks by simply registering one chunk per block requested.
 * Handles opening and uploading arbitrary BlockManager blocks.
 *
 * Opened blocks are registered with the "one-for-one" strategy, meaning each Transport-layer Chunk
 * is equivalent to one Spark-level shuffle block.
 */
class NettyBlockRpcServer(
    appId: String,
    serializer: Serializer,
    blockManager: BlockDataManager)
  extends RpcHandler with Logging 

  private val streamManager = new OneForOneStreamManager()

  override def receive(
      client: TransportClient,
      rpcMessage: ByteBuffer,
      responseContext: RpcResponseCallback): Unit = 
    val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
    logTrace(s"Received request: $message")

    message match 
      case openBlocks: OpenBlocks =>
        val blocks: Seq[ManagedBuffer] =
          openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
        val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
        logTrace(s"Registered streamId $streamId with $blocks.size buffers")
        responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)

      case uploadBlock: UploadBlock =>
        // StorageLevel is serialized as bytes using our JavaSerializer.
        val level: StorageLevel =
          serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
        val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
        blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
        responseContext.onSuccess(ByteBuffer.allocate(0))
    
  

  override def getStreamManager(): StreamManager = streamManager

2 构造传输上下文 TransportContext

TransportContext用于维护传输上下文,既可以创建Netty服务(TransportServer),可以创建Netty客户端(TransportClientFactory)其构造器:

  // TransportConf 主要控制Netty框架提供的shuffle的I/O交互的客户端和服务端线程的数量
  // RpcHandler  负责shuffle的I/O服务端在接收到客户端的RPC请求后,提供下载或者上传Block的RPC处理,此处即为NettyBlockRpcServer
  // MessageEncoder 在shuffle的I/O服务端对客户端传来的ByeBug进行解析,防止丢包和解析错误
  // MessageDecoder 在shuffle的I/O客户端对消息内容进行编码,防止服务端丢包和解析错误
  // closeIdleConnections 是否关闭空闲链接,默认为不关闭
  public TransportContext(
      TransportConf conf,
      RpcHandler rpcHandler,
      boolean closeIdleConnections) 
    this.conf = conf;
    this.rpcHandler = rpcHandler;
    this.encoder = new MessageEncoder();
    this.decoder = new MessageDecoder();
    this.closeIdleConnections = closeIdleConnections;
  

3 RPC客户端工厂 TransportClientFactory

TransportClientFactory是创建Netty客户端TransportClient的工厂类。TransportContext的createClientFactory方法创建TransportClientFactory

  /**
   * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
   * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
   * to create a Client.
   */
  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) 
    return new TransportClientFactory(this, bootstraps);
  

TransportClientFactory的实现包括:
1. TransportContext
2. TransportConf
3. clientBootstraps 用于缓存客户端的列表
4. connectionPool 用于缓存客户端连接
5. numConnectionsPerPeer 节点之间取数据的连接数,可以使用属性SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY(spark.shuffle.io.numConnectionsPerPeer)来配置,默认为1
6. socketChannelClass 客户端channel被创建时使用的类,可以使用属性spark.shuffle.io.mode来配置(有nio和epoll两种模式,默认为nio,即NiosocketChannel, epoll只能用于linux)
7. workerGroup 根据Netty的规范,客户端只有work组,所以此处可以创建workerGroup (NIO模式的话为 NioEventLoopGroup, EPOLL模式为EpollEventLoopGroup)
8. pooledAllocator 汇集ByteBuf但对本地线程缓存禁用的分配器。

  public TransportClientFactory(
      TransportContext context,
      List<TransportClientBootstrap> clientBootstraps) 
    this.context = Preconditions.checkNotNull(context);
    this.conf = context.getConf();
    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
    this.connectionPool = new ConcurrentHashMap<SocketAddress, ClientPool>();
    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
    this.rand = new Random();

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
    // TODO: Make thread pool name configurable.
    this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
  

4 Netty服务器 TransportServer

TransportServer提供了Netty实现的服务器端,用于提供RPC服务。创建TransportServer如下,最终会调用TransportContext的createServer方法创建TransportServer。

  /** Creates and binds the TransportServer, possibly trying multiple ports. */
  private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = 
    def startService(port: Int): (TransportServer, Int) = 
      val server = transportContext.createServer(port, bootstraps.asJava)
      (server, server.getPort)
    

    val portToTry = conf.getInt("spark.blockManager.port", 0)
    Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
  

5 下载远程shuffle文件

NettyBlockTransferService的fetchBlocks方法用于从远程拉取shuffle文件,实际利用的是NettyBlockTransferService总创建的Netty服务

  override def fetchBlocks(
      host: String,
      port: Int,
      execId: String,
      blockIds: Array[String],
      listener: BlockFetchingListener): Unit = 
    logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
    try 
      val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter 
        override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) 
          val client = clientFactory.createClient(host, port)
          new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
        
      

      val maxRetries = transportConf.maxIORetries()
      if (maxRetries > 0) 
        // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
        // a bug in this code. We should remove the if statement once we're sure of the stability.
        new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
       else 
        blockFetchStarter.createAndStart(blockIds, listener)
      
     catch 
      case e: Exception =>
        logError("Exception while beginning fetchBlocks", e)
        blockIds.foreach(listener.onBlockFetchFailure(_, e))
    
  

6 上传shuffle文件

NettyBlockTransferService的uploadBlock方法用于上传shuffle文件到远程Executor,实际也是利用NettyBlockTransferService中创建的Netty服务。

  override def uploadBlock(
      hostname: String,
      port: Int,
      execId: String,
      blockId: BlockId,
      blockData: ManagedBuffer,
      level: StorageLevel): Future[Unit] = 
    val result = Promise[Unit]()
    val client = clientFactory.createClient(hostname, port)

    // StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded
    // using our binary protocol.
    val levelBytes = serializer.newInstance().serialize(level).array()

    // Convert or copy nio buffer into array in order to serialize it.
    val nioBuffer = blockData.nioByteBuffer()
    val array = if (nioBuffer.hasArray) 
      nioBuffer.array()
     else 
      val data = new Array[Byte](nioBuffer.remaining())
      nioBuffer.get(data)
      data
    

    client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteBuffer,
      new RpcResponseCallback 
        override def onSuccess(response: ByteBuffer): Unit = 
          logTrace(s"Successfully uploaded block $blockId")
          result.success((): Unit)
        
        override def onFailure(e: Throwable): Unit = 
          logError(s"Error while uploading block $blockId", e)
          result.failure(e)
        
      )

    result.future
  

以上是关于Spark Shuffle服务和客户端的主要内容,如果未能解决你的问题,请参考以下文章

Magnet:即将随 Apache Spark 3.2 发布的高性能外部 Shuffle 服务

spark和mapreduce的shuffle

Magnet:即将随 Apache Spark 3.2 发布的高性能外部 Shuffle 服务

Magnet:即将随 Apache Spark 3.2 发布的高性能外部 Shuffle 服务

[spark] shuffle

Spark中的Spark Shuffle详解