如何在 Spark 中并行创建 RDD / DataFrame?

Posted

技术标签:

【中文标题】如何在 Spark 中并行创建 RDD / DataFrame?【英文标题】:How do you parallelize RDD / DataFrame creation in Spark? 【发布时间】:2015-07-07 23:59:13 【问题描述】:

假设我有一个如下所示的 Spark 作业:

def loadTable1() 
  val table1 = sqlContext.jsonFile(s"s3://textfiledirectory/")
  table1.cache().registerTempTable("table1")
  

def loadTable2() 
  val table2 = sqlContext.jsonFile(s"s3://testfiledirectory2/")
  table2.cache().registerTempTable("table2")
 


def loadAllTables() 
  loadTable1()
  loadTable2()


loadAllTables()

如何并行化此 Spark 作业以便同时创建两个表?

【问题讨论】:

您也已将其发布到user@spark 邮件列表。如果有人想阅读讨论,主题是“在 Spark 中并行创建多个 RDD / DataFrame” 【参考方案1】:

您不需要并行化它。 RDD/DF 创建操作不做任何事情。这些数据结构是惰性的,因此任何实际计算只会在您开始使用它们时发生。当 Spark 计算发生时,它会自动并行化(逐个分区)。 Spark 将在执行者之间分配工作。因此,通过引入进一步的并行性,您通常不会获得任何收益。

【讨论】:

这个答案有点正确,但是由于我们处于 Spark SQL 领域,所以情况会有所不同。数据框创建和注册操作实际上确实涉及一些工作,特别是对于 json 或类似结构,Spark SQL 需要扫描某些行以确定架构,而不是仅仅从标头中读取架构。如果我们查看 Catalog.scala,我们可以看到注册表涉及查找计划,这会强制进行一些分析。我们通过创建一个不存在的 Dataframe 并看到它失败来验证这一点(这意味着评估不是完全惰性的)。 感谢霍尔顿的评论!我不使用 DataFrame,所以我不知道它们的创建过程是什么。尽管如此,与处理相比,创建必须非常便宜——我认为并行化不能获得太多收益。 一个有趣的用例是,如果这是永久表,而不是临时表,例如带有附加模式的镶木地板格式。在这种情况下,它根本不是懒惰的,因为您可能会在 ETL 过程中实现很多表。我正在尝试做类似的事情。无论如何,我都明白每个“加载表”都被并行化了,但是如果每个表都不大,那么 registerTempTable 不会创建很多工作人员,因此并行化很多负载,每个负载都是单独串行(或最小并行)可以更好地利用资源。【参考方案2】:

使用期货!

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

def loadAllTables() 
  Future  loadTable1() 
  Future  loadTable2() 

【讨论】:

【参考方案3】:

您可以使用标准的 scala 线程机制来做到这一点。就个人而言,我想做一个带有路径和表名的对列表,然后对其进行并行映射。您还可以查看期货或标准线程。

【讨论】:

这个答案不适用于在多个节点上运行的 Spark。 嗨,Paul,这确实适用,因为注册表格只能在驱动程序中完成。 不,它没有。请参阅有关懒惰的其他答案。 我在另一个答案中添加了一条评论,解释说虽然 RDD 创建完全是懒惰的,因为我们正在谈论数据帧(尤其是 json 数据帧),所以评估并不是完全懒惰的。您可以通过尝试从一个不存在的文件创建一个 json Dataframe 来验证这一点,并注意到它会立即失败,这与常规 RDD 不同,它是完全惰性的并且在我们对其执行操作之前不会失败。数据帧 + 惰性与常规 RDD 有点不同。 仍然很清楚,在单独的线程中运行它对你没有什么好处

以上是关于如何在 Spark 中并行创建 RDD / DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD:如何共享数据以进行并行操作

spark习题

spark core之RDD编程

spark知识点_RDD

Spark RDD

Spark之RDD编程