Spark in action on Kubernetes - 存储篇

Posted 阿里系统软件技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark in action on Kubernetes - 存储篇相关的知识,希望对你有一定的参考价值。

作者 | 阿里云智能事业群技术专家 莫源

前言


在上篇文章中,我们分析了 Spark Operator 内部的机制,今天我们会讨论一个在大数据领域中最重要的话题——存储。


大数据已经无声无息的融入了每个人的生活中。大到旅游买房,小到外卖打车,都可以看到通过大数据提供数据分析、数据推荐、数据决策的使用场景。大数据要想能够更准确地协助决策,需要在数据多维度、数据完备性等方面有较高要求。


可预知的在未来,数据的量级会越来越大,特别是随着 5G 时代的到来,数据的吞吐量级成指数的增长,数据的维度与来源会越来越多,数据的种类也会变得越来越异质化,对大数据平台也带来新的挑战。成本低、存得多、读写快成为大数据存储的三大问题,而今天我们就会针对这三大问题进行探讨。


容器化大数据的计算与存储


计算和存储分离是大数据领域被大家讨论过很多次的问题了,通常我们会通过如下几个角度来看这个问题:

  • 硬件限制:机器的带宽成倍数的增长,但是磁盘的速度基本不变,从而造成数据本地读写优势减弱。

  • 计算成本:计算和存储的量级不匹配,可能造成算力的大量浪费,独立计算资源可以节约成本。

  • 存储成本:集中式存储可以在保证更高 SLA 的前提下降低存储成本,使得自建数据仓库的优势减少。


这三大问题,随着容器时代的到来也变得愈发的凸显。我们知道在 Kubernetes 中,Pod 是运行在底层的资源池上,而 Pod 所需要的存储是通过 PV 或者 PVC 的方式动态分配与挂载的,从某种意义来讲,容器本身的架构就是计算与存储分离的。那么使用了存储与计算分离方式的大数据容器集群会带来哪些变化与优势呢?


  • 成本更低

通常在阿里云上建立一个 Spark 大数据平台的时候,首先会选择 D 系列的机器,在上面搭建 HDFS、Hadoop 等一系列的基础组件,然后再将 Spark 等作业任务通过 Yarn 进行调度,跑在这个集群之上。D系列的内网带宽范围是3Gbps-20Gbps,默认可以绑定(4-28 块) 5.5T 的本地盘。因为在云上,云盘的 IO 和网络的 IO 是共享的,而本地盘的 IO 是独立的,因此 D 系列+本地盘的 IO 性能会比同规格传统机型+云盘的性能更好。

但是在实际生产中,我们会发现存储的数据对着时间变得越来越多,而由于数据具有一定的时效性,造成单位时间的算力与存储的增长是不相匹配的,这个时候就会带来了成本的浪费。那么如果我们使用计算和存储分离的思想,使用外部存储,例如 OSS、Nas 或者 DFS(阿里云 HDFS 产品),会有哪些变化呢?

首先,我们屏蔽由于存储的 IO 差异造成的影响,先都使用远程的 DFS 作为文件存储。然后我们选择了 ecs.ebmhfg5.2xlarge(8C32G6Gbps)和ecs.d1ne.2xlarge (8C32G6Gbps) 两款分别面向计算和大数据场景规格配置相同的热门机型,进行了比对。

ecs.ebmhfg5.2xlarge(8C32G)的测试结果:

Spark in action on Kubernetes - 存储篇


ecs.d1ne.2xlarge (8C32G)的测试结果:

Spark in action on Kubernetes - 存储篇


通过 Hibench 我们可粗略的估算,在假定 IO性能基本一致的场景下,ecs.ebmhfg5.2xlarge会比ecs.d1ne.2xlarge 计算性能高 30% 左右,而成本上 ecs.ebmhfg5.2xlarge 会比 ecs.d1ne.2xlarge 低 25% 左右。

也就是说如果单单只看计算的能力,是可以有更高效、更节省的机型选择的。当存储和计算分离后,我们可以从存储和计算两个维度分开去估算所需的用量,在机型上可以更多的考虑高主频计算能力较强 ECS,而存储上可以使用 OSS 或者 DFS,存储成本也相较本地存储更为低廉。


此外通常 D 系列的机型都是 1:4 的 CPU 内存比,随着大数据作业的场景越来越丰富,1:4 的 CPU 内存比也不完全是最佳的配比,当存储与计算分离后,我们可以根据业务的类型选择合适的计算资源,甚至可以在一个计算资源池中维护多种计算资源,从而提高资源使用率。

数据存储的 SLA 和计算任务的SLA也是完全不同的,存储上是无法忍受宕机或者中断的,但是对于计算任务而言,本身已经被切割为子任务了,单个子任务的异常只需重试即可,那么进一步就可以使用类似竞价实例这种成本更低的资源来作为计算任务运行时环境,实现成本的进一步优化。

此外容器最大的特点就是弹性,通过弹性的能力,容器可以在短时间内获得超远原本自身几十甚至上百倍的计算资源,而计算任务完成后又自动释放掉。目前阿里云容器服务提供 autoscaler 进行节点级别的弹性伸缩,可以做到在 1 分半内伸缩 500 台节点。传统的计算与存储耦合的场景下,存储是阻碍弹性的一大障碍,而存储和计算分离后,就可以针对近乎无状态的计算来实现弹性的能力,实现真正的按需使用、按量消费。


  • 存的更多

使用外部存储后,我们不止存储量级上可以做到近乎无限,而且可以有更多的选择。在本文开头位置,我们已经提到了大数据时代的到来,将引入更多维度、更异质化的数据。而这也对数据存储的方式与类型也带来了更多的挑战。


单纯的 HDFS、Hbase、Kafka 等数据存储与链路将无法满足我们的需求。例如从 IoT 设备采集的数据更倾向于使用时序存储进行离线、从应用上下游的产生的数据更倾向于存放在结构化数据库中,数据的来源与链路会越来越多,大数据平台的底层基础设施与依赖就会越变越多。在云上,阿里云提供了多种类型的存储服务,可以满足各种大数据处理的场景。


除了传统的 HDFS、Hbase、kafka、OSS、Nas、CPFS 等存储,还包含 MNS、TSDB、OAS(冷数据归档)等等。使用存储服务可以让大数据平台更关注在业务的开发,而不是底层基础架构的运维。不但能够存的更多,还可以存的更好、存的更省。


  • 读写更快

从某种角度来讲,读写更快是不可能的,因为独立本地盘可以通过挂足够盘并行的方式进行提升的,但是要注意的问题在于,当我们通过 MR 进行任务切割后,每个子任务的瓶颈是否还是在磁盘 IO 上,大部分情况下答案是否定。


上面我们测试的 ECS 规格内网的带宽已经可以到达 6Gbps,如果全部网络带宽都换算成磁盘的 IO 的话,这个量级的数据吞吐 IO 相比 8C32G 的算力而言是冗余的,所以此处我们提到的读写更快是指在 IO 冗余的前提下提升读写速度的方式。


OSS 是阿里云上提供的对象存储,读取不同单个文件的 IO 是并行的,也就是说如果你的业务场景是大量中小型文件的并行读取,例如在 Spark 中读写目录的方式,那么此时 IO 的读写速度近似是线性增强的。如果依然希望使用 HDFS 的开发者,阿里云也提 HDFS 存储服务,提供了大量存储与查询的优化,和传统的自建的 HDFS 相比有 50% 左右的提升。


Spark in action on Kubernetes - 存储篇


阿里云容器服务的存储方案


阿里云容器服务在多个维度多个层次满足大数据处理中的需求。开发者可以根据不同的业务场景和 IO 的新更能指标要求,选择合适自己的存储方式。


大量小文件存储

OSS 是面向这个场景的最佳使用方式,在容器中可以使用两种方式操作 OSS,一种是将 OSS 挂载为一个文件系统,一种是直接在 Spark 中使用 SDK 来操作。


第一种方案在大数据的场景下是非常不适用的,特别是文件比较多的场景,如果没有类似 SmartFS 的优化手段,会带来很大的时延与不一致性。而使用 SDK 的方式则非常直接简单,只需将相应的 Jar 放在 CLASSPATH 下即可,可以参考如下代码,直接处理  OSS 中的文件内容。


package com.aliyun.emr.example
object OSSSample extends RunLocally {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println(
        """Usage: bin/spark-submit --class OSSSample examples-1.0-SNAPSHOT-shaded.jar <inputPath> <numPartition>
          |
          |Arguments:
          |
          |    inputPath        Input OSS object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/a/b.txt
          |    numPartitions    the number of RDD partitions.
          |
        """
.stripMargin)
      System.exit(1)
    }
    val inputPath = args(0)
    val numPartitions = args(1).toInt
    val ossData = sc.textFile(inputPath, numPartitions)
    println("The top 10 lines are:")
    ossData.top(10).foreach(println)
  }
  override def getAppName: String = "OSS Sample"
}


另外针对 Spark SQL 的场景,阿里云也提供了 https://yq.aliyun.com/articles/593910">oss-select 的方式进行支持,可以通过 SparkSQL 的方式对单文件检索和查询。特别注意:当使用 Spark Operator 的方式进行任务执行是,需要在 Driver Pod 与 Exector Pod 的 CLASSPATH 下预置好相应的 Jar 包。

OSS 的方式主要面向单个文件在百兆之下,文件数目比较多的场景优化较好,数据存储是几种常见存储中最便宜的,支持冷热数据的分离,主要面向读多写少或者不写的场景。


HDFS 文件存储

阿里云上新推出了 DFS 服务,可以像在 Hadoop 分布式文件系统 (Hadoop Distributed File System) 中管理和访问数据。无需对现有大数据分析应用做任何修改,即可使用具备无限容量及性能扩展、单一命名空间、多共享、高可靠和高可用等特性的分布式文件系统。

DFS 服务兼容 HDFS 协议,开发者只需将相应的调用 Jar 包放置在 Driver Pod 与 Exector Pod 的 CLASSPATH 中即可,调用时可以如下的方式进行调用。


/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "dfs://f-5d68cc61ya36.cn-beijing.dfs.aliyuncs.com:10290/logdata/ab.log"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}


DFS 服务的方式主要是面向高 IO 读写的热数据场景,价格会高于 OSS 存储,但低于 Nas 以及其他结构化存储。对于已经习惯了 HDFS 的开发者而言,是最佳的的方案。在所有的存储方案中,目前 IO 性能最佳,同容量场景,IO 优于本地盘。


常规文件存储

OSS 的方式对于某些场景而言,数据的上传与传输依赖 SDK,操作会略显不便。那么Nas也是一种备选的方案,Nas 的本身的协议是强一致性的,开发者可以像操作本地文件的方式,读写数据。使用方式如下:


1. 首先在容器服务控制台创建 Nas 相关的存储 PV 与 PVC。

Spark in action on Kubernetes - 存储篇


2. 然后在 Spark Operator 的定义中声明所使用的 PodVolumeClain。


apiVersion: "sparkoperator.k8s.io/v1alpha1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"
  restartPolicy:
    type: Never
  volumes:
  - name: pvc-nas
    persistentVolumeClaim:
        claimName: pvc-nas
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.0
    serviceAccount: spark
    volumeMounts:
      - name: "pvc-nas"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.0
    volumeMounts:
      - name: "pvc-nas"
        mountPath: "/tmp"


https://www.alibabacloud.com/help/zh/doc-detail/88940.htm


Nas 存储的方式在 Spark 的场景下用途比较少,主要是因为在 IO 方面与 HDFS 有一定差距,在存储价格方面比OSS 也贵了不少。不过对于需要复用一些 data workflow 的产生结果,且 IO 要求要求不是特别高的场景,Nas 的使用还是非常简单的。


其他存储结构

在 Spark Streaming 的场景中,我们还经常使用例如 mns 或者 kafka,有的时候也会使用 Elasticsearch 与 Hbase 等等。这些在阿里云上面也都有对应的服务支持,开发者可以通过这些云服务的集成与使用,将精力更多的放在数据开发上。


最后


本文主要和大家探讨了当容器遇到大数据后,改如何通过存储与计算分离的方式,降低资源的使用成本,通过不同的场景,选择合适的存储方式,实现云原生的大数据容器化计算。



点击“阅读原文”订阅《CNCF x Alibaba 云原生技术公开课》开课提醒!

以上是关于Spark in action on Kubernetes - 存储篇的主要内容,如果未能解决你的问题,请参考以下文章

Spark is not running in local mode, therefore the checkpoint directory must not be on the local……(代码

配置Spark on YARN集群内存

Spark On YARN内存分配

install spark in Windows

Spark- Action实战

shape into blocks--source code in python based on pySpark