当有更多可用时,Spark 仅使用一台工作机器

Posted

技术标签:

【中文标题】当有更多可用时,Spark 仅使用一台工作机器【英文标题】:Spark is only using one worker machine when more are available 【发布时间】:2018-04-01 23:13:43 【问题描述】:

我正在尝试通过 Spark 并行化机器学习预测任务。我之前曾在其他任务上成功使用过多次 Spark,并且之前没有遇到过并行化问题。

在这个特定任务中,我的集群有 4 个工作人员。我在具有 4 个分区的 RDD 上调用 mapPartitions。 map 函数从磁盘加载模型(引导脚本分发执行此操作所需的所有内容;我已经验证它存在于每台从机上)并对 RDD 分区中的数据点执行预测。

代码运行,但只使用一个执行器。其他执行程序的日志显示“已调用关闭挂钩”。在代码的不同运行中,它使用不同的机器,但一次只使用一台。

如何让 Spark 同时使用多台机器?

我通过 Zeppelin 笔记本在 Amazon EMR 上使用 PySpark。代码sn-ps如下。

%spark.pyspark

sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")

from ModelLoader import ModelLoader
from MyClassifier import MyClassifier

def load_models():
    models_path = '/home/hadoop/models'
    model_loader = ModelLoader(models_path)

    models = model_loader.load_models()
    return models

def process_file(file_contents, models):
    filename = file_contents[0]
    filetext = file_contents[1]
    pred = MyClassifier.predict(filetext, models)
    return (filename, pred)

def process_partition(file_list):
    models = load_models()
    for file_contents in file_list:
        pred = process_file(file_contents, models)
        yield pred


all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')

如预期的那样有四个任务,但它们都运行在同一个执行器上!

我的集群正在运行,并且可以提供资源管理器中可用的日志。我只是还不知道去哪里找。

【问题讨论】:

您是否正确设置了 Zeppelin 以在 yarn-cluster 模式下工作? AFAR,EMR 中的 Zeppelin 以本地模式启动。 @Zouzias 我以前从来不需要做任何特别的事情来让它正确使用多个工人。我认为模式是正确的。 “master”配置值设置为“yarn-client”。 "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"capacity-scheduler 中指定的需要吗?否则 YARN 将不会根据您的工作优化集群使用。 能否请这行 all_contents.getNumPartitions 看看有多少个分区可用 @Achyuth 分区数是 4(或者我传入 WholeTextFiles 的任何数字) 【参考方案1】:

这里要提两点(但不确定它们是否能解决您的问题):

    wholeTextFiles 使用扩展CombineFileInputFormatWholeTextFileInputFormat,并且由于CombineFileInputFormat,它会尝试将多组小文件合并到一个分区中。因此,例如,如果您将分区数设置为 2,您“可能”会得到两个分区,但不能保证,这取决于您正在读取的文件的大小。 wholeTextFiles 的输出是一个 RDD,它在每个记录中包含一个完整的文件(并且每个记录/文件不能被拆分,因此它将以位于单个分区/工作者中而结束)。因此,如果您只读取一个文件,尽管您在示例中将分区设置为 4,但最终会将整个文件放在一个分区中。

【讨论】:

感谢您的回答。我正在阅读大约 2000 个文件,似乎确实有 4 个分区(至少根据 getNumPartitions)【参考方案2】:

该进程的分区数与您指定的一样多,但它是以序列化方式进行的。

执行者

该进程可能会启动默认数量的执行程序。这可以在纱线资源管理器中看到。在您的情况下,所有处理均由一位执行者完成。如果 executor 有多个核心,它将使工作并行化。在 emr 中,您必须进行此更改,以便为执行程序提供 1 个以上的核心。

在我们的案例中具体发生的情况是,数据很小,所以所有数据都在一个执行器中读取(即使用一个节点)。如果没有以下属性,执行程序仅使用单核。因此,所有任务都是序列化的。

设置属性

sudo  vi /etc/hadoop/conf/capacity-scheduler.xml

如下图所示设置属性

"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalcul‌​ator"

为了使这个属性适用,你必须重新启动纱线

 sudo  hadoop-yarn-resourcemanager stop

重启纱线

 sudo  hadoop-yarn-resourcemanager start 

提交作业后,请查看 yarn 和 spark-ui

在 Yarn 你会看到更多的执行器核心

【讨论】:

我们可以聊一会儿吗,这个会解决的 chat.stackexchange.com/rooms/67849/debugging-spark-executors我们来聊聊

以上是关于当有更多可用时,Spark 仅使用一台工作机器的主要内容,如果未能解决你的问题,请参考以下文章

Keepalived高可用配置

Keepalived高可用配置

Nginx双机热备

17.Linux高可用之Keepalived

负载均衡的keeplived服务

Keepalived+Nginx实现高可用(HA)