为啥只有一个 spark 作业只使用一个执行器运行?

Posted

技术标签:

【中文标题】为啥只有一个 spark 作业只使用一个执行器运行?【英文标题】:Why is only one of spark jobs running using only one executor?为什么只有一个 spark 作业只使用一个执行器运行? 【发布时间】:2016-06-15 11:46:14 【问题描述】:

我的 Spark 集群有 1 个 master 和 2 个 worker。应用程序从 s3 读取 csv 文件到 DataFrames,将它们注册为临时表并使用 sqlContext 运行 sql 查询以创建新的 DataFrames。然后将这些 DF 存储到 mysql DB。这些作业都在多个节点上运行。

但是当我将这些表从 DB 读回 DataFrames,将它们注册为临时表并运行 sqlContext 查询时,所有处理都仅由一个节点完成。这可能是什么原因造成的?

这是我的代码示例:

 DataFrame a = sqlContext.read().format("com.databricks.spark.csv").options(options)
                .load("s3://s3bucket/a/part*");
 DataFrame b = sqlContext.read().format("com.databricks.spark.csv").options(options)
                .load("s3://s3bucket/b/part*");

a.registerTempTable("a");
b.registerTempTable("b");

DataFrame c = sqlContext.sql("SELECT  a.name, b.name from   a join b on  a.id = b.a_id");

c.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "c", prop);

// other jobs are similar 

Map<String, String> dOptions = new HashMap<String, String>();
dOptions.put("driver", MYSQL_DRIVER);
dOptions.put("url", MYSQL_CONNECTION_URL);

dOptions.put("dbtable", "(select * from c) AS c");
rC= sqlContext.read().format("jdbc").options(dOptions).load();
rC.cache();

 dOptions.put("dbtable", "(select * from d) AS d");
 rD= sqlContext.read().format("jdbc").options(dOptions).load();
 rD.cache();

 dOptions.put("dbtable", "(select * from f) AS f");
 rF= sqlContext.read().format("jdbc").options(dOptions).load();
 rF.cache();

 rC.registerTempTable("rC");
 rD.registerTempTable("rD");
 rF.registerTempTable("rF");

DataFrame result = sqlContext.sql("SELECT  rC.name, rD.name, rF.date  from rC join rD on rC.name = rD.name join rF on rC.date = rF.date");

result.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "result_table", prop);

【问题讨论】:

你用什么来提交你的工作?当您的意思是“一个节点”时,您的意思是您只能在主 UI 上看到一个工作人员吗? @Hawknight 我正在使用 spark-submit 提交作业。这是完整的命令:“spark-submit --class MyClass --deploy-mode cluster s3://bucket/file.jar”。我是通过 Ganglia 和 Spark UI 进行监控的。两者都认识工人,我看到一些工作确实是并行执行的。但是,在我上面发布的工作期间,并行化停止并且某些任务(在阶段内)仅由一个工作节点执行。这是来自 Ganglia UI 的屏幕截图。 pokit.org/get/img/2a5bcd853b97aad2bc9e86a90c9b2733.png 仅根据代码,很难判断为什么只有一名工人会在某些阶段工作。您能否尝试准确指出问题出现在哪个阶段和哪个任务? @Hawknight 这是我从 SparkUI 收集的信息。作业 18:阶段 17/25,任务(对于所有阶段 1036/2438),任务 198/200(阶段中)和任务 199/200 这只是第一次发生,我可以更新下一个停止的任务,当他们有。以下是一些可能对您有所帮助的屏幕截图。 pokit.org/get/img/ba03e44c1e5d2c3bfa98f4f0db1f8021.png pokit.org/get/img/89611aae56ba2cd6f1cf2aa4cfc2bfc6.png pokit.org/get/img/689ec7cbf496f0876aa3ea32d3649131.png 如果您发现此信息不相关,请具体说明您需要什么,我会提供。 好的,现在清楚一点了。您能否准确找到代码中的哪个jdbc 阶段有问题?是c DataFrame 的第一个还是result DataFrame 的最后一个?您的 SparkConf 是特定于该作业还是与在两个节点上执行的其他作业全局共享? 【参考方案1】:

您能与我们分享您的 SparkConf() 对象吗?

SparkConf() 对象包含 Spark 应用程序的配置。它用于将各种 Spark 参数设置为键值对,例如:

-主人

-执行者的数量

-执行器核心数

-分配的堆内存

-其他..

【讨论】:

这是我的 SparkConf 对象:SparkConf conf = new SparkConf().setAppName("org.spark.SchemaTransformer").setMaster("yarn-cluster");执行环境:spark.master = yarn executor.cores = 4 executor.memory = 5120M 我认为你错过了一些论点。尝试添加以下方法。类似于:SparkConf().set("master","yarn")\ .set("spark.submit.deployMode" ,"cluster") .set("spark.executor.instances","8")\ 。 set("spark.executor.cores","4")\ .set("spark.executor.memory","5120M")\ .set("spark.driver.memory","5120M")\ .set( "spark.yarn.memoryOverhead","10000M")\ .set("spark.yarn.driver.memoryOverhead","10000M")【参考方案2】:

如果您未指定 partitionColumn, lowerBound, upperBound, numPartitions 或 predicates,Spark 将使用单个执行程序并创建单个非空分区。所有数据都将使用单个事务处理

【讨论】:

以上是关于为啥只有一个 spark 作业只使用一个执行器运行?的主要内容,如果未能解决你的问题,请参考以下文章

为啥运行不成功 spark

为啥 spark.read.parquet() 运行 2 个作业?

为啥 Spark 每个执行器只使用一个核心?它如何决定使用分区数量以外的核心?

如何使火花同时运行作业中的所有任务?

为啥不在空的 Spark 集群上强制执行preferredLocations?

如何在单个 Spark 作业中调用多个 writeStream 操作?