Spark 独立部署模式

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 独立部署模式相关的知识,希望对你有一定的参考价值。

Spark Standalone Mode

http://spark.apache.org/docs/latest/spark-standalone.html


除了在 Mesos 或 YARN 集群管理器上运行之外,Spark 还提供了一种简单的独立部署模式。 您可以手动启动独立集群,通过手动启动主节点和工作节点,或使用我们提供的 启动脚本。 也可以在一台机器上运行这些守护进程进行测试。

Security

默认情况下,Spark 中的安全性是关闭的。 这可能意味着默认情况下您很容易受到攻击。 在运行 Spark 之前,请参阅 Spark Security 和本文档中的特定安全部分。

Installing Spark Standalone to a Cluster

要安装 Spark Standalone 模式,您只需在集群的每个节点上放置一个编译版本的 Spark。 您可以获得每个版本的预构建版本的 Spark 或自己构建

Starting a Cluster Manually

您可以通过执行以下命令来启动独立的主服务器:

./sbin/start-master.sh

一旦启动,master 会为自己打印出一个 spark://HOST:PORT URL,你可以使用它来将 worker 连接到它,或者作为“master”参数传递给 SparkContext。 你也可以在 master 的 web UI 上找到这个 URL,默认是 http://localhost:8080

同样,您可以启动一个或多个 worker 并将它们连接到 master:

./sbin/start-worker.sh <master-spark-URL>

一旦你启动了一个 worker,看看 master 的 web UI(默认是 http://localhost:8080)。 您应该会在那里看到列出的新节点,以及它的 CPU 和内存数量(减去留给操作系统的 1 GB)。

最后,可以将以下配置选项传递给 master 和 worker:

ArgumentMeaning
-h HOST, --host HOST监听的主机名
-i HOST, --ip HOST要侦听的主机名(已弃用,使用 -h 或 --host)
-p PORT, --port PORT服务侦听的端口(默认值:7077 为 master,随机为 worker)
--webui-port PORTWeb UI 的端口(默认:master 8080,worker 8081)
-c CORES, --cores CORES允许 Spark 应用程序在机器上使用的 CPU 内核总数(默认值:全部可用);只对worker
-m MEM, --memory MEM允许 Spark 应用程序在机器上使用的内存总量,格式为 1000M 或 2G(默认值:机器的总 RAM 减去 1 GiB); only on worker
-d DIR, --work-dir DIR用于暂存空间和作业输出日志的目录(默认:SPARK_HOME/work); only on worker
--properties-file FILE要加载的自定义 Spark 属性文件的路径(默认:conf/spark-defaults.conf)

Cluster Launch Scripts

要使用启动脚本启动 Spark 独立集群,您应该在 Spark 目录中创建一个名为 conf/workers 的文件,该文件必须包含您打算启动 Spark workers 的所有机器的主机名,每行一个。 如果 conf/workers 不存在,则启动脚本默认为单台机器 (localhost),这对测试很有用。 请注意,主机通过 ssh 访问每个工作机。 默认情况下,ssh 并行运行并且需要设置无密码(使用私钥)访问。 如果您没有无密码设置,您可以设置环境变量 SPARK_SSH_FOREGROUND 并为每个工作人员连续提供密码。

一旦你设置了这个文件,你就可以使用以下 shell 脚本启动或停止你的集群,基于 Hadoop 的部署脚本,并且在 SPARK_HOME/sbin 中可用:

  • sbin/start-master.sh - 在执行脚本的机器上启动主实例。
  • sbin/start-workers.sh -在 conf/workers 文件中指定的每台机器上启动一个工作实例。
  • sbin/start-worker.sh - 在执行脚本的机器上启动一个工作实例。
  • sbin/start-all.sh - 如上所述启动一个主节点和多个工作节点。
  • sbin/stop-master.sh - 停止通过 sbin/start-master.sh 脚本启动的 master。
  • sbin/stop-worker.sh - 停止执行脚本的机器上的所有工作实例。
  • sbin/stop-workers.sh - 停止在 conf/workers 文件中指定的机器上的所有工作实例。
  • sbin/stop-all.sh - 如上所述停止主节点和工作节点。

请注意,这些脚本必须在您想要运行 Spark master 的机器上执行,而不是您的本地机器。

您可以选择通过在 conf/spark-env.sh 中设置环境变量来进一步配置集群。 从 conf/spark-env.sh.template 开始创建此文件,并将其复制到所有工作机器以使设置生效。 以下设置可用:

环境变量含义
SPARK_MASTER_HOST将 master 绑定到特定的主机名或 IP 地址,例如公共地址。
SPARK_MASTER_PORT在不同的端口上启动主节点(默认:7077)。
SPARK_MASTER_WEBUI_PORT主 Web UI 的端口(默认值:8080)。
SPARK_MASTER_OPTS仅适用于形式为“-Dx=y”(默认值:none)的母版的配置属性。 请参阅下面的可能选项列表。
SPARK_LOCAL_DIRSSpark 中用于“scratch”空间的目录,包括地图输出文件和存储在磁盘上的 RDD。 这应该在您系统中的快速本地磁盘上。 它也可以是不同磁盘上多个目录的逗号分隔列表。
SPARK_WORKER_CORES允许 Spark 应用程序在机器上使用的内核总数 (default: all available cores).
SPARK_WORKER_MEMORY允许 Spark 应用程序在机器上使用的内存总量,例如 1000m2g(默认:总内存减去 1 GiB); 请注意,每个应用程序的个别 内存是使用其spark.executor.memory 属性配置的。
SPARK_WORKER_PORT在特定端口上启动 Spark 工作器 (default: random).
SPARK_WORKER_WEBUI_PORT工作程序 Web UI 的端口 (default: 8081).
SPARK_WORKER_DIR运行应用程序的目录,其中包括日志和暂存空间(默认:SPARK_HOME/work)。
SPARK_WORKER_OPTS仅适用于“-Dx=y”形式的工作器的配置属性(默认值:none)。 请参阅下面的可能选项列表。
SPARK_DAEMON_MEMORY分配给 Spark 主进程和工作进程自身的内存 (default: 1g).
SPARK_DAEMON_JAVA_OPTSSpark 主进程和工作进程自身的 JVM 选项,格式为“-Dx=y”(default: none).
SPARK_DAEMON_CLASSPATHSpark 主进程和工作进程自身的类路径 (default: none).
SPARK_PUBLIC_DNSSpark master 和 worker 的公共 DNS 名称 (default: none).

Note: 启动脚本当前不支持 Windows。 要在 Windows 上运行 Spark 集群,请手动启动 master 和 worker。

SPARK_MASTER_OPTS 支持以下系统属性:

属性名称默认值含义起始版本
spark.deploy.retainedApplications200要显示的已完成应用程序的最大数量。 较旧的应用程序将从 UI 中删除以维持此限制。0.8.0
spark.deploy.retainedDrivers200要显示的已完成驱动程序的最大数量。 较旧的驱动程序将从 UI 中删除以维持此限制。1.1.0
spark.deploy.spreadOuttrue独立集群管理器是应该将应用程序分散到节点上还是尝试将它们整合到尽可能少的节点上。 分散对于 HDFS 中的数据局部性通常更好,但对于计算密集型工作负载,整合更有效。0.6.1
spark.deploy.defaultCores(infinite)如果未设置 spark.cores.max,则在 Spark 的独立模式下为应用程序提供的默认内核数。 如果没有设置,应用程序总是会获得所有可用的核心,除非他们自己配置了 spark.cores.max。 在共享集群上将此设置得较低,以防止用户默认抓取整个集群。0.9.0
spark.deploy.maxExecutorRetries10限制在独立集群管理器删除故障应用程序之前可能发生的背靠背执行器故障的最大数量。 如果应用程序有任何正在运行的执行程序,它将永远不会被删除。 如果应用程序连续遇到超过 spark.deploy.maxExecutorRetries 失败,在这些失败之间没有执行程序成功开始运行,并且应用程序没有正在运行的执行程序,那么独立集群管理器将删除该应用程序并将其标记为失败。 要禁用此自动删除,请将 spark.deploy.maxExecutorRetries 设置为 -11.6.3
spark.worker.timeout60如果没有收到心跳,独立部署主服务器认为工作器丢失的秒数。0.6.2
spark.worker.resource.{resourceName}.amount(none)要在工作器上使用的特定资源的数量。3.0.0
spark.worker.resource.{resourceName}.discoveryScript(none)资源发现脚本的路径,用于在工作器启动时查找特定资源。 并且脚本的输出应该像 ResourceInformation 类一样格式化。3.0.0
spark.worker.resourcesFile(none)资源文件的路径,用于在工作器启动时查找各种资源。 资源文件的内容格式应为[{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1"," 2"]}]。 如果在资源文件中未找到特定资源,则将使用发现脚本来查找该资源。 如果发现脚本也没有找到资源,则工作器将无法启动。3.0.0

SPARK_WORKER_OPTS 支持以下系统属性:

Property NameDefaultMeaningSince Version
spark.worker.cleanup.enabledfalse启用定期清理工作人员/应用程序目录。 请注意,这仅影响独立模式,因为 YARN 的工作方式不同。 仅清理已停止应用程序的目录。 如果 spark.shuffle.service.db.enabled 为“true”,则应启用此功能1.0.0
spark.worker.cleanup.interval1800 (30 minutes)控制worker清理本地计算机上旧应用程序工作目录的时间间隔(以秒为单位)。1.0.0
spark.worker.cleanup.appDataTtl604800 (7 days, 7 * 24 * 3600)在每个工作程序上保留应用程序工作目录的秒数。 这是一个生存时间,应该取决于您拥有的可用磁盘空间量。 应用程序日志和 jar 文件下载到每个应用程序工作目录。 随着时间的推移,工作目录会迅速填满磁盘空间,尤其是在您非常频繁地运行作业的情况下。1.0.0
spark.shuffle.service.db.enabledtrue将 External Shuffle 服务状态存储在本地磁盘上,这样当外部 shuffle 服务重新启动时,它会自动重新加载当前执行程序的信息。 这仅影响独立模式(纱线始终启用此行为)。 您还应该启用 spark.worker.cleanup.enabled,以确保状态最终得到清理。 将来可能会删除此配置。3.0.0
spark.storage.cleanupFilesAfterExecutorExittrue在 executor 退出后启用清理工作目录的非 shuffle 文件(例如临时 shuffle 块、缓存的 RDD/广播块、溢出文件等)。 请注意,这与 spark.worker.cleanup.enabled 不重叠,因为这可以清除死执行器本地目录中的非 shuffle 文件,而 spark.worker.cleanup.enabled 可以清除所有文件 / 停止和超时应用程序的子目录。 这只影响独立模式,以后可以添加其他集群管理器的支持。2.4.0
spark.worker.ui.compressedLogFileLengthCacheSize100对于压缩日志文件,未压缩文件只能通过解压缩文件来计算。 Spark 缓存压缩日志文件的未压缩文件大小。 此属性控制缓存大小。2.0.2

Resource Allocation and Configuration Overview

请确保已阅读 配置页面 上的自定义资源调度和配置概述部分。本节只讨论 Spark Standalone 资源调度的具体方面。

Spark Standalone 有两部分,第一部分是为 Worker 配置资源,第二部分是特定应用的资源分配。

用户必须配置 Workers 以获得一组可用资源,以便它可以将它们分配给 Executors。 spark.worker.resource.{resourceName}.amount 用于控制 worker 分配的每个资源的数量。用户还必须指定 spark.worker.resourcesFilespark.worker.resource.{resourceName}.discoveryScript 以指定 Worker 如何发现其分配的资源。请参阅上面每种方法的说明,以了解哪种方法最适合您的设置。

第二部分是在 Spark Standalone 上运行应用程序。标准 Spark 资源配置的唯一特殊情况是当您在客户端模式下运行驱动程序时。对于客户端模式下的 Driver,用户可以通过 spark.driver.resourcesFilespark.driver.resource.{resourceName}.discoveryScript 指定它使用的资源。如果驱动程序与其他驱动程序在同一台主机上运行,请确保资源文件或发现脚本只返回与运行在同一节点上的其他驱动程序不冲突的资源。

请注意,用户在提交应用程序时不需要指定发现脚本,因为 Worker 将使用它分配给它的资源启动每个 Executor。

Connecting an Application to the Cluster

要在 Spark 集群上运行应用程序,只需将 master 的 spark://IP:PORT URL 作为 [SparkContext 构造函数](http://spark.apache.org/docs/latest/rdd -programming-guide.html#initializing-spark)。

要对集群运行交互式 Spark shell,请运行以下命令:

./bin/spark-shell --master spark://IP:PORT

您还可以传递一个选项 --total-executor-cores <numCores> 来控制 spark-shell 在集群上使用的内核数量。

Client Properties

Spark 应用程序支持以下特定于独立模式的配置属性:

Property NameDefault ValueMeaningSince Version
spark.standalone.submit.waitAppCompletionfalse在独立集群模式下,控制客户端是否等待退出直到应用程序完成。 如果设置为“true”,客户端进程将保持活动轮询驱动程序的状态。 否则,客户端进程提交后将退出。3.1.0

Launching Spark Applications

spark-submit 脚本 提供了将已编译的 Spark 应用程序提交到集群的最直接方式。对于独立集群,Spark 目前支持两种部署模式。在client 模式下,驱动程序在与提交应用程序的客户端相同的进程中启动。然而,在cluster 模式下,驱动程序从集群内的其中一个工作进程启动,并且客户端进程在完成提交应用程序的职责后立即退出,而无需等待应用程序完成。

如果您的应用程序是通过 Spark submit 启动的,那么应用程序 jar 会自动分发到所有工作节点。对于您的应用程序依赖的任何其他 jar,您应该通过 --jars 标志使用逗号作为分隔符(例如 --jars jar1,jar2)来指定它们。要控制应用程序的配置或执行环境,请参阅 Spark 配置

此外,如果应用程序以非零退出代码退出,独立 cluster 模式支持自动重新启动应用程序。要使用此功能,您可以在启动应用程序时将 --supervise 标志传递给 spark-submit。然后,如果您希望终止重复失败的应用程序,您可以通过以下方式进行:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

您可以通过位于 http://<master url>:8080 的独立 Master Web UI 找到驱动程序 ID。

Resource Scheduling

独立集群模式目前仅支持跨应用程序的简单 FIFO 调度程序。 但是,为了允许多个并发用户,您可以控制每个应用程序将使用的最大资源数。 默认情况下,它将获取集群中的所有核心,这仅在您一次只运行一个应用程序时才有意义。 您可以通过在 SparkConf 中设置 spark.cores.max 来限制内核数量。 例如:

val conf = new SparkConf()
  .setMaster(...)
  .setAppName(...)
  .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

此外,您可以在集群主进程上配置 spark.deploy.defaultCores,以更改未将 spark.cores.max 设置为小于无限的应用程序的默认值。 通过将以下内容添加到 conf/spark-env.sh 来做到这一点:

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

这对于用户可能没有单独配置最大内核数的共享集群很有用。

Executors Scheduling

分配给每个执行器的内核数量是可配置的。 当显式设置 spark.executor.cores 时,如果 worker 有足够的内核和内存,来自同一个应用程序的多个 executor 可能会在同一个 worker 上启动。 否则,默认情况下,每个 executor 都会获取 worker 上可用的所有内核,在这种情况下,每个应用程序在一次调度迭代期间只能在每个 worker 上启动一个 executor。

Monitoring and Logging

Spark 的独立模式提供了一个基于 Web 的用户界面来监控集群。 master 和每个 worker 都有自己的 Web UI,用于显示集群和作业统计信息。 默认情况下,您可以在端口 8080 上访问 master 的 web UI。可以在配置文件中或通过命令行选项更改端口。

此外,每个作业的详细日志输出也会写入每个工作节点的工作目录(默认为SPARK_HOME/work)。 您将看到每个作业的两个文件,stdoutstderr,所有输出都写入其控制台。

Running Alongside Hadoop

您可以将 Spark 与现有的 Hadoop 集群一起运行,只需在同一台机器上将其作为单独的服务启动即可。 要从 Spark 访问 Hadoop 数据,只需使用 hdfs:// URL(通常为 hdfs://<namenode>:9000/path,但您可以在 Hadoop Namenode 的 Web UI 上找到正确的 URL)。 或者,您可以为 Spark 设置一个单独的集群,并仍然让它通过网络访问 HDFS; 这将比磁盘本地访问慢,但如果您仍然在同一个局域网中运行(例如,您在每个装有 Hadoop 的机架上放置几台 Spark 机器),这可能不是问题。

Configuring Ports for Network Security

一般来说,Spark 集群及其服务并不部署在公共互联网上。 它们通常是私有服务,只能在部署 Spark 的组织的网络内访问。 对 Spark 服务使用的主机和端口的访问应仅限于需要访问服务的原始主机。

这对于使用独立资源管理器的集群尤其重要,因为它们不像其他资源管理器那样支持细粒度的访问控制。

有关要配置的完整端口列表,请参阅 安全页面

High Availability

默认情况下,独立调度集群对 Worker 故障具有弹性(就 Spark 本身而言,通过将其转移到其他 Worker 可以弹性地避免丢失工作)。 但是,调度器使用 Master 来做出调度决策,这(默认情况下)会造成单点故障:如果 Master 崩溃,则无法创建新应用程序。 为了规避这一点,我们有两个高可用性方案,详述如下。

Standby Masters with ZooKeeper

Overview

利用 ZooKeeper 提供领导者选举和一些状态存储,您可以在连接到同一个 ZooKeeper 实例的集群中启动多个 Master。 一个人将被选为“领导者”,其他人将保持待机状态。 如果当前的leader死了,会选举另一个Master,恢复旧Master的状态,然后恢复调度。 整个恢复过程(从第一个领导者宕机开始)应该需要 1 到 2 分钟。 请注意,此延迟仅影响调度应用程序——在主故障转移期间已经运行的应用程序不受影响。

Learn more about getting started with ZooKeeper here.

Configuration

为了启用这种恢复模式,您可以在 spark-env 中通过配置 spark.deploy.recoveryMode 和相关的 spark.deploy.zookeeper.* 配置来设置 SPARK_DAEMON_JAVA_OPTS。 有关这些配置的更多信息,请参阅配置文档

可能的问题:如果您的集群中有多个 Master,但未能正确配置 Master 以使用 ZooKeeper,则 Master 将无法发现彼此并认为它们都是领导者。 这不会导致健康的集群状态(因为所有 Master 将独立调度)。

Details

设置好 ZooKeeper 集群后,启用高可用性很简单。只需在具有相同 ZooKeeper 配置(ZooKeeper URL 和目录)的不同节点上启动多个 Master 进程。可以随时添加和删除master。

为了调度新的应用程序或将 Workers 添加到集群中,他们需要知道当前领导者的 IP 地址。这可以通过简单地传入一个 Master 列表来完成,而你过去只传入一个 Master。例如,您可以启动指向 spark://host1:port1,host2:port2 的 SparkContext。这会导致你的 SparkContext 尝试向两个 Master 注册——如果 host1 出现故障,这个配置仍然是正确的,因为我们会找到新的领导者 host2

“向 Master 注册”和正常操作之间有一个重要的区别。启动时,应用程序或 Worker 需要能够找到并注册到当前的主 Master。但是,一旦它成功注册,它就会“在系统中”(即存储在 ZooKeeper 中)。如果发生故障转移,新的领导者会联系所有之前注册的应用程序和工人,通知他们领导权的变化,所以他们甚至不需要在启动时知道新的 Master 的存在。

由于这个特性,可以随时创建新的 Master,你唯一需要担心的是应用程序和 Workers 可以找到它来注册,以防它成为领导者。注册后,您将得到照顾。

Single-Node Recovery with Local File System

Overview

ZooKeeper 是实现生产级高可用性的最佳方式,但如果您只想在 Master 宕机时重新启动它,FILESYSTEM 模式可以解决这个问题。 当应用程序和 Workers 注册时,它们有足够的状态写入提供的目录,以便在重新启动 Master 进程时可以恢复它们。

Configuration

为了启用此恢复模式,您可以使用以下配置在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS:

System propertyMeaningSince Version
spark.deploy.recoveryMode设置为 FILESYSTEM 以启用单节点恢复模式 (default: NONE).0.8.1
spark.deploy.recoveryDirectorySpark 将存储恢复状态的目录,可从 Master 的角度访问。0.8.1

Details

  • 此解决方案可与 monit 等进程监视器/管理器协同使用,或仅用于通过重启启用手动恢复。
  • 虽然文件系统恢复似乎比根本不进行任何恢复要好,但对于某些开发或实验目的,这种模式可能不是最理想的。 特别是,通过 stop-master.sh 杀死一个 master 并不会清理它的恢复状态,所以每当你启动一个新的 Master 时,它都会进入恢复模式。 如果需要等待所有先前注册的工作人员/客户端超时,这可能会将启动时间增加最多 1 分钟。
  • 虽然它不受官方支持,但您可以挂载一个 NFS 目录作为恢复目录。 如果原来的 Master 节点完全死掉,那么你可以在不同的节点上启动一个 Master,这将正确恢复所有以前注册的 Worker/应用程序(相当于 ZooKeeper 恢复)。 但是,未来的应用程序必须能够找到新的 Master 才能注册。

以上是关于Spark 独立部署模式的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 2.1 独立集群的客户端部署模式下,RDD 或 Spark SQL 数据帧在哪里存储或持久化?

Spark部署模式

Spark安装部署| 运行模式

Spark集群管理器介绍

SparkSpark 的运行环境运行架构提交流程

spark考试