在 Scala 中设计和并行化 Spark 应用程序的最佳方法 [关闭]

Posted

技术标签:

【中文标题】在 Scala 中设计和并行化 Spark 应用程序的最佳方法 [关闭]【英文标题】:Best way to design and parallelize a Spark application in Scala [closed] 【发布时间】:2017-05-04 06:01:00 【问题描述】:

我正在使用 Scala 开发 Spark 应用程序,并且想知道将其并行化并在 Hadoop 集群上运行的最佳方法。我的代码将从 HDFS 文件中读取每一行并对其进行解析并生成多个记录(对于每一行),我将这些记录存储为案例类。我已经在 getElem() 方法中编写了完整的逻辑并按预期工作。

现在,我想计算所有输入记录的逻辑并将响应存储到 HDFS 位置。

请让我知道我将如何处理 spark 并合并为输入生成的所有相应输出记录并写入 HDFS。

        object testing extends Serializable 
      var recordArray=Array[Record]();
       def main(args:Array[String])
      

      val conf = new SparkConf().setAppName("jsonParsing").setMaster("local")
      val sc = new SparkContext(conf)
      val sqlContext= new SQLContext(sc)

      val input=sc.textFile("hdfs://loc/data.txt")
     // input.collect().foreach(println)
      input.map(data=>getElem(parse(data,false),sc,sqlContext))

    
          //method definition
    def getElem(json:JValue)=

         // Parses the json and creates array of datasets for each input record and stores the data in case class
  val x= Record("xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx")
             
  case class Record(summary_key: String, key: String,array_name_position:Int,Parent_Level_1:String,Parent_level_2:String,Parent_Level_3:String,Parent_level_4:String,Parent_level_5:String,
        param_name_position:Integer,Array_name:String,paramname:String,paramvalue:String)
           

【问题讨论】:

代码示例不完整。 recordArray 从未使用过,getElem 未指定返回类型(在您发布的代码中,它仅返回 Unit)。它也有错误的签名,您在地图中将SparkContextSQLContext 传递给它,但在定义中它只接受JValueparse 从未被解释过。您能否发布一个工作示例来说明您的问题并准确指出它不工作的原因? 【参考方案1】:

你说你有工作 case class 并且你有 parsed 来自 hdfsinput text file 并转换为 record case class。 然后使用.toDF().toDS() 转换为dataframedataset 将很容易 现在剩下的任务就是将case class rdddataframedataset写入hdfs将案例类rdd写入hdfs: 只需调用.saveAsTextFile() api,您就可以提供output directory 的路径,就像您从hdfs 读取一样将数据帧或数据集写入hdfs: 你可以使用databricks api .write.format("com.databricks.spark.csv").save() 在那里你可以给输出hdfs dir name 我希望这有帮助 注意:我猜你是通过ide 运行代码。你应该学会spark-submit更多信息可以在here找到

【讨论】:

谢谢。是的,我正在通过 IDE 运行它。我不想在处理完每个输入行后将记录转储到 HDFS,这样会创建许多部分文件。我想处理所有输入记录,然后将输出转储到 HDFS。但是,使用我上面的代码,getElem 方法永远不会被调用 使用上面的代码,它甚至还没有达到可以运行 getElem 的程度,因为它不会编译。解决您的问题,我们无法猜测您实际运行的是什么代码 - 但这肯定不是您的问题 @sun_dare,您必须使用您的解析方法更新您的问题,以便我可以对其进行测试并将完整的答案发布给您。顺便说一句,您可以使用coalesce 来获得单个输出。而且你必须了解transformation and action,这样你才能理解 spark 的工作原理。

以上是关于在 Scala 中设计和并行化 Spark 应用程序的最佳方法 [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop 之 Spark 安装配置与示例

spark 并行度

spark习题

并行使用 scala Spark 重命名 HDFS 文件时的序列化问题

rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值

Spark基础