sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark 集群上运行在哪里?

Posted

技术标签:

【中文标题】sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark 集群上运行在哪里?【英文标题】:Where does sqlContext.read...load() and sqlContext.write...save() code runs on Spark Cluster? 【发布时间】:2017-07-11 06:53:23 【问题描述】:

我正在使用 Spark Dataframe API 从 NFS 共享加载/读取文件,然后将该文件的数据保存/写入 HDFS。

我有一个包含一个主节点和两个工作节点的三节点 Spark 集群。我的 Spark 集群使用 YARN 作为集群管理器,因此两个 Worker 节点是 YARN NodeManager 节点,主节点是 Yarn ResourceManager 节点。

我有一个远程位置,例如 /data/files,它安装到所有三个 YARN/SPARK 节点,因为它是 [/data/files],其中存在我想要读取的所有 csv 文件 [多个]从并最终写入 HDFS。

我在我的主节点上运行以下代码

import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext

object TestMoreThan1CSV2DF 
  private val source: String = "file:///data/files/"
  private val destination = "hdfs://<myHostIP>:8020/raw/"
  private val fileFormat : String = "com.databricks.spark.csv"

  def main(args:Array[String]):Unit=
    val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))

    for(file<-fileArray)
//  reading csv file from shared location and taking whole data in a dataframe
    var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)

//      variable for holding destination location : HDFS Location
    var finalDestination: String = destination+file.getName

//  saving data into HDFS
    writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
    
  

 def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = 
   try
       sqlContext.read.format(fileFormat)
                       .option("header", header) // Use first line of all files as header
                       .option("inferSchema", inferSchema) // Automatically infer data types
                       .load(source)
   
   catch
     case ex: OnboardingException => 
            throw ex;
        
   
 

 def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1)
   try
       df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
   
   catch
     Case ez : OnboardingException => 
            throw ez;
        
   
 

此代码读取共享位置的所有 csv 文件 /data/files/ 并将它们中的每一个写入HDFS。前任: /data/files/f1.csv 将作为 /raw/f1.csv/part-xxxxx 加载到 HDFS 文件

运行此代码时,我无法辨认:

1) 整个代码在哪里运行?它在驱动程序上运行吗?或使用 两个工人?

2) load() 和 save() API 是否在工作节点上运行? 它并行工作?如果是,那么两名工人如何跟踪 它读过或写过的while部分?

3) 现在我是 在“for”循环中顺序读取每个文件并处理每个文件 它们按顺序排列,是否有可能使其成为多线程 应用程序,其中每个文件分配给一个线程执行 端到端并行读写。磁盘 IO 是否会受到任何约束 这样做的时候?

任何快速响应/参考/指针将不胜感激。

问候, 布佩什

【问题讨论】:

【参考方案1】:

从另一个线程复制的非常好的解释用于我的查询: differentiate driver code and work code in Apache Spark

在这里也复制其中的一部分: 由转换创建的闭包内发生的所有事情都发生在工作人员身上。这意味着如果在 map(...)、filter(...)、mapPartitions(...)、groupBy*(...)、aggregateBy*(...) 内部传递了某些内容,则会在工作人员上执行。它包括从持久存储或远程源读取数据。

count、reduce(...)、fold(...) 等操作通常在驱动程序和工作程序上执行。繁重的工作由工作人员并行执行,一些最后的步骤,例如减少从工作人员收到的输出,在驱动程序上按顺序执行。

其他一切,例如触发动作或转换都发生在驱动程序上。特别是它意味着需要访问 SparkContext 的每个操作。

就我的疑问而言: 1) 是的 main() 方法的一部分在驱动程序上运行,但转换发生在

2) load() 和 save() 在工作人员上运行,因为我们可以看到加载创建数据帧 [存储在分区中的内存中] 并保存在 hdfs 中创建部分 xxxx 文件,这表明工作人员正在这样做

3) 仍在努力实现这一目标,一旦完成就会回答这个问题。

谢谢

【讨论】:

【参考方案2】:

很好的实验!!。

1) 你的代码总是在worker上运行。驱动程序只是为了管理工人。

2) 是 load() 和 save() API 在工作节点上运行。他们按顺序工作。

3) 使用多线程应用程序:我还没有尝试过。祝你好运“去试试!!”。但是你为什么要把自己置于复杂的境地!! SPARK 知道如何处理这种情况。

【讨论】:

嗨@Achyuta nanda sahoo,感谢您的回复。我的假设是所有转换代码都在工作人员上运行,而其余代码在驱动程序上运行。最后,动作在工人或司机身上运行。我仍然不清楚我的理解是否正确。如果 load() 和 save() 在工作节点上工作,那么它如何跟踪文件的哪个部分被哪个工作人员读/写???如果你喜欢这个问题,那么请给它一个点,以便大多数人可以像你一样找到它并提供他们有价值的 cmets。 主要动作总是从驱动节点发生。主要动作意味着从所有节点获取聚合数据。主驱动程序就像根节点一样。在树形结构图中,根节点是驱动节点,其余节点是工作节点。主驱动程序尝试创建分区/索引。分区是指从一个文件或多个文件中对数据集群进行分组。并将每个集群发送到每个节点。就是这样。

以上是关于sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark 集群上运行在哪里?的主要内容,如果未能解决你的问题,请参考以下文章

loadsave方法spark sql的几种数据源

从 spark RDD 中删除空字符串

在 pyspark Dataframe 上创建新的模式或列名

在 spark 中获取 parquet 表目录的源文件

EMR 5.28 无法在 s3 上加载镶木地板文件