当有更多可用时,Spark 仅使用一台工作机器
Posted
技术标签:
【中文标题】当有更多可用时,Spark 仅使用一台工作机器【英文标题】:Spark is only using one worker machine when more are available 【发布时间】:2018-04-01 23:13:43 【问题描述】:我正在尝试通过 Spark 并行化机器学习预测任务。我之前曾在其他任务上成功使用过多次 Spark,并且之前没有遇到过并行化问题。
在这个特定任务中,我的集群有 4 个工作人员。我在具有 4 个分区的 RDD 上调用 mapPartitions。 map 函数从磁盘加载模型(引导脚本分发执行此操作所需的所有内容;我已经验证它存在于每台从机上)并对 RDD 分区中的数据点执行预测。
代码运行,但只使用一个执行器。其他执行程序的日志显示“已调用关闭挂钩”。在代码的不同运行中,它使用不同的机器,但一次只使用一台。
如何让 Spark 同时使用多台机器?
我通过 Zeppelin 笔记本在 Amazon EMR 上使用 PySpark。代码sn-ps如下。
%spark.pyspark
sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")
from ModelLoader import ModelLoader
from MyClassifier import MyClassifier
def load_models():
models_path = '/home/hadoop/models'
model_loader = ModelLoader(models_path)
models = model_loader.load_models()
return models
def process_file(file_contents, models):
filename = file_contents[0]
filetext = file_contents[1]
pred = MyClassifier.predict(filetext, models)
return (filename, pred)
def process_partition(file_list):
models = load_models()
for file_contents in file_list:
pred = process_file(file_contents, models)
yield pred
all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')
如预期的那样有四个任务,但它们都运行在同一个执行器上!
我的集群正在运行,并且可以提供资源管理器中可用的日志。我只是还不知道去哪里找。
【问题讨论】:
您是否正确设置了 Zeppelin 以在 yarn-cluster 模式下工作? AFAR,EMR 中的 Zeppelin 以本地模式启动。 @Zouzias 我以前从来不需要做任何特别的事情来让它正确使用多个工人。我认为模式是正确的。 “master”配置值设置为“yarn-client”。 是"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
在capacity-scheduler
中指定的需要吗?否则 YARN 将不会根据您的工作优化集群使用。
能否请这行 all_contents.getNumPartitions 看看有多少个分区可用
@Achyuth 分区数是 4(或者我传入 WholeTextFiles 的任何数字)
【参考方案1】:
这里要提两点(但不确定它们是否能解决您的问题):
wholeTextFiles
使用扩展CombineFileInputFormat
的WholeTextFileInputFormat
,并且由于CombineFileInputFormat
,它会尝试将多组小文件合并到一个分区中。因此,例如,如果您将分区数设置为 2,您“可能”会得到两个分区,但不能保证,这取决于您正在读取的文件的大小。
wholeTextFiles
的输出是一个 RDD,它在每个记录中包含一个完整的文件(并且每个记录/文件不能被拆分,因此它将以位于单个分区/工作者中而结束)。因此,如果您只读取一个文件,尽管您在示例中将分区设置为 4,但最终会将整个文件放在一个分区中。
【讨论】:
感谢您的回答。我正在阅读大约 2000 个文件,似乎确实有 4 个分区(至少根据 getNumPartitions)【参考方案2】:该进程的分区数与您指定的一样多,但它是以序列化方式进行的。
执行者
该进程可能会启动默认数量的执行程序。这可以在纱线资源管理器中看到。在您的情况下,所有处理均由一位执行者完成。如果 executor 有多个核心,它将使工作并行化。在 emr 中,您必须进行此更改,以便为执行程序提供 1 个以上的核心。
在我们的案例中具体发生的情况是,数据很小,所以所有数据都在一个执行器中读取(即使用一个节点)。如果没有以下属性,执行程序仅使用单核。因此,所有任务都是序列化的。
设置属性
sudo vi /etc/hadoop/conf/capacity-scheduler.xml
如下图所示设置属性
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
为了使这个属性适用,你必须重新启动纱线
sudo hadoop-yarn-resourcemanager stop
重启纱线
sudo hadoop-yarn-resourcemanager start
提交作业后,请查看 yarn 和 spark-ui
在 Yarn 你会看到更多的执行器核心
【讨论】:
我们可以聊一会儿吗,这个会解决的 chat.stackexchange.com/rooms/67849/debugging-spark-executors我们来聊聊以上是关于当有更多可用时,Spark 仅使用一台工作机器的主要内容,如果未能解决你的问题,请参考以下文章