为啥只有一个 spark 作业只使用一个执行器运行?
Posted
技术标签:
【中文标题】为啥只有一个 spark 作业只使用一个执行器运行?【英文标题】:Why is only one of spark jobs running using only one executor?为什么只有一个 spark 作业只使用一个执行器运行? 【发布时间】:2016-06-15 11:46:14 【问题描述】:我的 Spark 集群有 1 个 master 和 2 个 worker。应用程序从 s3 读取 csv 文件到 DataFrames,将它们注册为临时表并使用 sqlContext 运行 sql 查询以创建新的 DataFrames。然后将这些 DF 存储到 mysql DB。这些作业都在多个节点上运行。
但是当我将这些表从 DB 读回 DataFrames,将它们注册为临时表并运行 sqlContext 查询时,所有处理都仅由一个节点完成。这可能是什么原因造成的?
这是我的代码示例:
DataFrame a = sqlContext.read().format("com.databricks.spark.csv").options(options)
.load("s3://s3bucket/a/part*");
DataFrame b = sqlContext.read().format("com.databricks.spark.csv").options(options)
.load("s3://s3bucket/b/part*");
a.registerTempTable("a");
b.registerTempTable("b");
DataFrame c = sqlContext.sql("SELECT a.name, b.name from a join b on a.id = b.a_id");
c.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "c", prop);
// other jobs are similar
Map<String, String> dOptions = new HashMap<String, String>();
dOptions.put("driver", MYSQL_DRIVER);
dOptions.put("url", MYSQL_CONNECTION_URL);
dOptions.put("dbtable", "(select * from c) AS c");
rC= sqlContext.read().format("jdbc").options(dOptions).load();
rC.cache();
dOptions.put("dbtable", "(select * from d) AS d");
rD= sqlContext.read().format("jdbc").options(dOptions).load();
rD.cache();
dOptions.put("dbtable", "(select * from f) AS f");
rF= sqlContext.read().format("jdbc").options(dOptions).load();
rF.cache();
rC.registerTempTable("rC");
rD.registerTempTable("rD");
rF.registerTempTable("rF");
DataFrame result = sqlContext.sql("SELECT rC.name, rD.name, rF.date from rC join rD on rC.name = rD.name join rF on rC.date = rF.date");
result.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "result_table", prop);
【问题讨论】:
你用什么来提交你的工作?当您的意思是“一个节点”时,您的意思是您只能在主 UI 上看到一个工作人员吗? @Hawknight 我正在使用 spark-submit 提交作业。这是完整的命令:“spark-submit --class MyClass --deploy-mode cluster s3://bucket/file.jar”。我是通过 Ganglia 和 Spark UI 进行监控的。两者都认识工人,我看到一些工作确实是并行执行的。但是,在我上面发布的工作期间,并行化停止并且某些任务(在阶段内)仅由一个工作节点执行。这是来自 Ganglia UI 的屏幕截图。 pokit.org/get/img/2a5bcd853b97aad2bc9e86a90c9b2733.png 仅根据代码,很难判断为什么只有一名工人会在某些阶段工作。您能否尝试准确指出问题出现在哪个阶段和哪个任务? @Hawknight 这是我从 SparkUI 收集的信息。作业 18:阶段 17/25,任务(对于所有阶段 1036/2438),任务 198/200(阶段中)和任务 199/200 这只是第一次发生,我可以更新下一个停止的任务,当他们有。以下是一些可能对您有所帮助的屏幕截图。 pokit.org/get/img/ba03e44c1e5d2c3bfa98f4f0db1f8021.png pokit.org/get/img/89611aae56ba2cd6f1cf2aa4cfc2bfc6.png pokit.org/get/img/689ec7cbf496f0876aa3ea32d3649131.png 如果您发现此信息不相关,请具体说明您需要什么,我会提供。 好的,现在清楚一点了。您能否准确找到代码中的哪个jdbc
阶段有问题?是c
DataFrame 的第一个还是result
DataFrame 的最后一个?您的 SparkConf 是特定于该作业还是与在两个节点上执行的其他作业全局共享?
【参考方案1】:
您能与我们分享您的 SparkConf() 对象吗?
SparkConf() 对象包含 Spark 应用程序的配置。它用于将各种 Spark 参数设置为键值对,例如:
-主人
-执行者的数量
-执行器核心数
-分配的堆内存
-其他..
【讨论】:
这是我的 SparkConf 对象:SparkConf conf = new SparkConf().setAppName("org.spark.SchemaTransformer").setMaster("yarn-cluster");执行环境:spark.master = yarn executor.cores = 4 executor.memory = 5120M 我认为你错过了一些论点。尝试添加以下方法。类似于:SparkConf().set("master","yarn")\ .set("spark.submit.deployMode" ,"cluster") .set("spark.executor.instances","8")\ 。 set("spark.executor.cores","4")\ .set("spark.executor.memory","5120M")\ .set("spark.driver.memory","5120M")\ .set( "spark.yarn.memoryOverhead","10000M")\ .set("spark.yarn.driver.memoryOverhead","10000M")【参考方案2】:如果您未指定 partitionColumn, lowerBound, upperBound, numPartitions 或 predicates,Spark 将使用单个执行程序并创建单个非空分区。所有数据都将使用单个事务处理
【讨论】:
以上是关于为啥只有一个 spark 作业只使用一个执行器运行?的主要内容,如果未能解决你的问题,请参考以下文章
为啥 spark.read.parquet() 运行 2 个作业?
为啥 Spark 每个执行器只使用一个核心?它如何决定使用分区数量以外的核心?