Spark 将任务分发给多个执行器

Posted

技术标签:

【中文标题】Spark 将任务分发给多个执行器【英文标题】:Spark distribute tasks over several executors 【发布时间】:2019-01-08 20:15:53 【问题描述】:

我希望并行运行 SQL 查询,并且能够将并行度控制为 8 个查询。现在,我正在做这段代码。 这个想法是创建 8 个分区并允许执行程序并行运行它们。

  (1 to 8).toSeq.toDF.repartition(8) // 8 partitions
  .rdd.mapPartitions(
  x => 
  val conn = createConnection()
    x.foreach
      s =>  // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = $s.get(0)")
      
    
  conn.close()
  x
  ).take(1)

问题是 8 个查询是一一运行的。

我应该如何继续让查询运行 8 x 8?

【问题讨论】:

如果您有足够的资源来并行运行 8 个作业(执行程序中至少有 8 个线程)应该没问题。您也可以尝试使用 udf 执行 sql。 【参考方案1】:

当你这样做时

val df = (1 to 8).toSeq.toDF.repartition(8)

这不会创建 8 个分区,每个分区有 1 条记录。如果您检查此数据框(参见例如https://***.com/a/46032600/1138523),那么您会得到:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|               0|                0|
|               1|                0|
|               2|                0|
|               3|                0|
|               4|                0|
|               5|                0|
|               6|                4|
|               7|                4|
+----------------+-----------------+

因此您将只有 2 个非空分区,因此您将拥有最大 2 倍并行度(我在这里问过这个问题:How does Round Robin partitioning in Spark work?)

要制作大小相等的分区,最好使用

spark.sparkContext.parallelize((0 to 7), numSlices = 8)

而不是

(1 to 8).toSeq.toDF.repartition(8).rdd

第一个选项为每个分区提供 1 条记录,第二个选项不是因为它使用循环分区

附带说明,当您执行x.foreach 时,将消耗x(迭代器只能遍历一次),因此如果您返回x,您将始终得到一个空迭代器。

所以你的最终代码可能如下所示:

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
  x => 
  val xL = x.toList  // convert to List
  assert(xL.size==1) // make sure partition has only 1 record

  val conn = createConnection()
    xL.foreach
      s =>  // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = $s")
      
    
  conn.close()
  xL.toIterator
  )
 .collect // trigger all queries

除了使用mapPartitions(这是惰性的),您还可以使用foreachPartition,这是非惰性的

由于每个分区只有 1 条记录,因此迭代分区并没有真正的好处,您也可以只使用普通的 foreach

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> 
  val conn = createConnection()
  execute(s"SELECT * FROM myTable WHERE col = $s")   
  conn.close()
)

【讨论】:

你能告诉我使用 foreachPartition 而不是 mapPartitions 有什么好处吗? 仅当我使用 ForeachPartition 时,您的方法一直有效。 @parisni 你确定你试过mapPartitions 后跟collect 吗?如果您不需要返回值,count 也应该可以解决问题 我的意思是 mapPartitions 有效。它只是没有像 foreachPartition 那样分发。可能与惰性方面有关 我使用的是 take(1),也许这很有意义,我应该用 collect 进行测试。无论如何感谢您的帮助。

以上是关于Spark 将任务分发给多个执行器的主要内容,如果未能解决你的问题,请参考以下文章

谈Spark下并行执行多个Job的问题

谈Spark下并行执行多个Job的问题

spark-submit参数

interllj IDEA配置spark

使用Crontab定时执行Spark任务

为啥我只看到分阶段的 200 个任务?