如何将火花日志文件转换为一个 CSV 文件

Posted

技术标签:

【中文标题】如何将火花日志文件转换为一个 CSV 文件【英文标题】:How to convert spark log files into one CSV File 【发布时间】:2018-07-04 19:19:09 【问题描述】:

我有一组 spark 应用程序日志文件,我希望将每个文件的应用程序名称、提交时间、完成时间和 Accumulables 指标作为单行添加到一个 CSV 文件中。使用火花/斯卡拉 编辑: 我很抱歉,但是一个 Spark 应用程序日志文件太大了,无法放在这里,而且每项工作都会重复更新一些指标,而且我需要所有指标的总和——最后一个不是更新的——在这里我尝试了直到现在

import org.apache.log4j._
import org.apache.spark.sql._


object LogToCSV 
  val Logs= "SparkAppName, SubmissionTime, CompletionTime,ExecutorDeserializeCpuTime,ResultSize,ShuffleReadRemoteBytesRead, ShuffleReadFetchWaitTime,MemoryBytesSpilled,ShuffleReadLocalBytesRead,ExecutorDeserializeTime,PeakExecutionMemory,ExecutorCpuTime, ShuffleReadLocalBlocksFetched,JVMGCTime,ShuffleReadRemoteBytesReadToDisk,ShuffleReadRecordsRead,DiskBytesSpilled,ExecutorRunTime,ShuffleReadRemoteBlocksFetched,Result"
  def main(args: Array[String]): Unit = 
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    val ss = SparkSession
      .builder
      .appName("SparkSQLDFjoin")
      .master("local[*]")
      .getOrCreate()
    import ss.implicits._
ScalaWriter.Writer.Write(Logs, "Results.csv")
    val Dir = ss.sparkContext.wholeTextFiles("/home/rudaini/Desktop/Thesis/Results/Results/Tesx/*")
    println(Dir.count())
    Dir.foreach(F =>
      var SparkAppName = ""
      var SubmissionTime: Double = 0
      var CompletionTime: Double = 0
      var ExecutorDeserializeCpuTime: Double = 0
      var ResultSize = ""
      var ShuffleReadRemoteBytesRead = ""
      var ShuffleReadFetchWaitTime = ""
      var MemoryBytesSpilled = ""
      var ShuffleReadLocalBytesRead = ""
      var ExecutorDeserializeTime = ""
      var PeakExecutionMemory = ""
      var ExecutorCpuTime = ""
      var ShuffleReadLocalBlocksFetched = ""
      var JVMGCTime = ""
      var ShuffleReadRemoteBytesReadToDisk = ""
      var ShuffleReadRecordsRead = ""
      var DiskBytesSpilled = ""
      var ExecutorRunTime = ""
      var ShuffleReadRemoteBlocksFetched = ""
      var Result = ""

      F.toString().split("\n").foreach(L =>
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)

        if(L.contains("ApplicationStart"))
          SubmissionTime = L.substring(L.indexOf("Timestamp")+11,
            L.indexOf(",\"User\":\"")).toDouble

        if(L.contains("ApplicationEnd"))
          CompletionTime = L.substring(L.indexOf("Timestamp")+11,L.indexOf("Timestamp")+24).toDouble

        if(L.contains("SparkSubmit.scala"))
          ExecutorDeserializeCpuTime = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")).toDouble
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)
        if(L.contains("spark.app.name"))
          SparkAppName = L.substring(L.indexOf("app.name")+11,
            L.indexOf("spark.scheduler")-3)


      )
      val LineX  = SparkAppName +","+ SubmissionTime +","+ CompletionTime +","+ ExecutorDeserializeCpuTime +","+ ResultSize +","+ ShuffleReadRemoteBytesRead +","+ ShuffleReadFetchWaitTime +","+ MemoryBytesSpilled +","+
        ShuffleReadLocalBytesRead +","+ ExecutorDeserializeTime +","+ PeakExecutionMemory +","+ ExecutorCpuTime +","+
        ShuffleReadLocalBlocksFetched +","+ JVMGCTime +","+ ShuffleReadRemoteBytesReadToDisk +","+
        ShuffleReadRecordsRead +","+ DiskBytesSpilled +","+ ExecutorRunTime +","+ ShuffleReadRemoteBlocksFetched +","+
        Result

      ScalaWriter.Writer.Write(LineX, "Results.csv")
    )    
    ss.stop()
  

我还没有完成,但通过更多修改获得更好的结果

【问题讨论】:

欢迎来到***!您的问题没有明确定义,请扩展问题陈述并展示一个简短但完整的示例,说明您迄今为止尝试过的内容。检查此以供参考:jonskeet.uk/csharp/complete.html 【参考方案1】:

我有点理解你的问题,并根据我的理解来回答。我希望您可以进一步构建您的问题,我可能会详细回答您的问题。

//define all dataframes globally
var df1: DataFrame = _
var df2: DataFrame = _      
var df3: DataFrame = _
// define main function
//initialize spark session
//creates a list of all files in a directory
def getListOfFiles(dir: String):List[File] = 

  val path = new File("/path/to/directory/")
  if (path.exists && path.isDirectory) 
  
    path.listFiles.filter(_.isFile).toList
   
  else 
  
    List[File]()
  


val files = getListOfFiles("/path/to/directory/")
  val input = ""
  for (input <- files)
  
    // code to extract log file data (I can help you further if you will explain your problem further)
   // load your log file data into a dataframe
 import spark.implicits._

    if(input == files(0))
    
        df1 = Seq(
        (App Name.value, Submission Time.value, Completion Time.value, Accumulables metrics.value)
        ).toDF("App Name", "Submission Time", "Completion Time", "Accumulables metrics")
     
    else
        
        df2 = Seq(
        (App Name.value, Submission Time.value, Completion Time.value, Accumulables metrics.value)
        ).toDF("App Name", "Submission Time", "Completion Time", "Accumulables metrics")  
        df3 = trainingDF.union(df2)
        df1 = df3
    
  

  //  import dataframe to .csv file
  df1.coalesce(1).write
  .option("header", "true")
  .csv("path/to/directory/file_name.csv")

【讨论】:

嗨 Pardeep,你的 file_name.csv 不是 standard CSV... 我在 this question 有类似的问题,解决方案是使用 Log4j 或 PrintWriter。也许你可以在这里表达一个使用 Log4j 的解决方案。

以上是关于如何将火花日志文件转换为一个 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章

如何将火花数据输出到具有单独列的 csv 文件?

在 C# 中将日志文件转换为 pcap 文件

Jmeter日志重播需要更多时间

bash将csv尾部的纪元时间转换为人类可读

使用log4net将日志文件输出为csv格式

将日志文件转换为 XML 和(XHTML,建议