如何将火花日志文件转换为一个 CSV 文件
Posted
技术标签:
【中文标题】如何将火花日志文件转换为一个 CSV 文件【英文标题】:How to convert spark log files into one CSV File 【发布时间】:2018-07-04 19:19:09 【问题描述】:我有一组 spark 应用程序日志文件,我希望将每个文件的应用程序名称、提交时间、完成时间和 Accumulables 指标作为单行添加到一个 CSV 文件中。使用火花/斯卡拉 编辑: 我很抱歉,但是一个 Spark 应用程序日志文件太大了,无法放在这里,而且每项工作都会重复更新一些指标,而且我需要所有指标的总和——最后一个不是更新的——在这里我尝试了直到现在
import org.apache.log4j._
import org.apache.spark.sql._
object LogToCSV
val Logs= "SparkAppName, SubmissionTime, CompletionTime,ExecutorDeserializeCpuTime,ResultSize,ShuffleReadRemoteBytesRead, ShuffleReadFetchWaitTime,MemoryBytesSpilled,ShuffleReadLocalBytesRead,ExecutorDeserializeTime,PeakExecutionMemory,ExecutorCpuTime, ShuffleReadLocalBlocksFetched,JVMGCTime,ShuffleReadRemoteBytesReadToDisk,ShuffleReadRecordsRead,DiskBytesSpilled,ExecutorRunTime,ShuffleReadRemoteBlocksFetched,Result"
def main(args: Array[String]): Unit =
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val ss = SparkSession
.builder
.appName("SparkSQLDFjoin")
.master("local[*]")
.getOrCreate()
import ss.implicits._
ScalaWriter.Writer.Write(Logs, "Results.csv")
val Dir = ss.sparkContext.wholeTextFiles("/home/rudaini/Desktop/Thesis/Results/Results/Tesx/*")
println(Dir.count())
Dir.foreach(F =>
var SparkAppName = ""
var SubmissionTime: Double = 0
var CompletionTime: Double = 0
var ExecutorDeserializeCpuTime: Double = 0
var ResultSize = ""
var ShuffleReadRemoteBytesRead = ""
var ShuffleReadFetchWaitTime = ""
var MemoryBytesSpilled = ""
var ShuffleReadLocalBytesRead = ""
var ExecutorDeserializeTime = ""
var PeakExecutionMemory = ""
var ExecutorCpuTime = ""
var ShuffleReadLocalBlocksFetched = ""
var JVMGCTime = ""
var ShuffleReadRemoteBytesReadToDisk = ""
var ShuffleReadRecordsRead = ""
var DiskBytesSpilled = ""
var ExecutorRunTime = ""
var ShuffleReadRemoteBlocksFetched = ""
var Result = ""
F.toString().split("\n").foreach(L =>
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("ApplicationStart"))
SubmissionTime = L.substring(L.indexOf("Timestamp")+11,
L.indexOf(",\"User\":\"")).toDouble
if(L.contains("ApplicationEnd"))
CompletionTime = L.substring(L.indexOf("Timestamp")+11,L.indexOf("Timestamp")+24).toDouble
if(L.contains("SparkSubmit.scala"))
ExecutorDeserializeCpuTime = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")).toDouble
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
if(L.contains("spark.app.name"))
SparkAppName = L.substring(L.indexOf("app.name")+11,
L.indexOf("spark.scheduler")-3)
)
val LineX = SparkAppName +","+ SubmissionTime +","+ CompletionTime +","+ ExecutorDeserializeCpuTime +","+ ResultSize +","+ ShuffleReadRemoteBytesRead +","+ ShuffleReadFetchWaitTime +","+ MemoryBytesSpilled +","+
ShuffleReadLocalBytesRead +","+ ExecutorDeserializeTime +","+ PeakExecutionMemory +","+ ExecutorCpuTime +","+
ShuffleReadLocalBlocksFetched +","+ JVMGCTime +","+ ShuffleReadRemoteBytesReadToDisk +","+
ShuffleReadRecordsRead +","+ DiskBytesSpilled +","+ ExecutorRunTime +","+ ShuffleReadRemoteBlocksFetched +","+
Result
ScalaWriter.Writer.Write(LineX, "Results.csv")
)
ss.stop()
我还没有完成,但通过更多修改获得更好的结果
【问题讨论】:
欢迎来到***!您的问题没有明确定义,请扩展问题陈述并展示一个简短但完整的示例,说明您迄今为止尝试过的内容。检查此以供参考:jonskeet.uk/csharp/complete.html 【参考方案1】:我有点理解你的问题,并根据我的理解来回答。我希望您可以进一步构建您的问题,我可能会详细回答您的问题。
//define all dataframes globally
var df1: DataFrame = _
var df2: DataFrame = _
var df3: DataFrame = _
// define main function
//initialize spark session
//creates a list of all files in a directory
def getListOfFiles(dir: String):List[File] =
val path = new File("/path/to/directory/")
if (path.exists && path.isDirectory)
path.listFiles.filter(_.isFile).toList
else
List[File]()
val files = getListOfFiles("/path/to/directory/")
val input = ""
for (input <- files)
// code to extract log file data (I can help you further if you will explain your problem further)
// load your log file data into a dataframe
import spark.implicits._
if(input == files(0))
df1 = Seq(
(App Name.value, Submission Time.value, Completion Time.value, Accumulables metrics.value)
).toDF("App Name", "Submission Time", "Completion Time", "Accumulables metrics")
else
df2 = Seq(
(App Name.value, Submission Time.value, Completion Time.value, Accumulables metrics.value)
).toDF("App Name", "Submission Time", "Completion Time", "Accumulables metrics")
df3 = trainingDF.union(df2)
df1 = df3
// import dataframe to .csv file
df1.coalesce(1).write
.option("header", "true")
.csv("path/to/directory/file_name.csv")
【讨论】:
嗨 Pardeep,你的file_name.csv
不是 standard CSV... 我在 this question 有类似的问题,解决方案是使用 Log4j 或 PrintWriter。也许你可以在这里表达一个使用 Log4j 的解决方案。以上是关于如何将火花日志文件转换为一个 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章