在 Spark 中组合来自多个目录的日志

Posted

技术标签:

【中文标题】在 Spark 中组合来自多个目录的日志【英文标题】:Combining the logs from multiple directories in Spark 【发布时间】:2017-02-14 11:18:21 【问题描述】:

根据日志文件的创建日期,我有日志文件进入不同的目录。

例如

> /mypath/2017/01/20/... 
.
.
.
> /mypath/2017/02/13/...
> /mypath/2017/02/14/...

我想使用 pyspark 将所有这些日志文件合并到一个 rdd 中,以便我可以对这个主文件进行聚合。

到目前为止,我已经获取了单独的目录,称为 sqlContext 并使用 Union 来加入特定日期的所有日志文件。

DF1 = (sqlContext.read.schema(schema).json("/mypath/2017/02/13")).union(sqlContext.read.schema(schema).json("/mypath/2017/02/14"))

有没有一种简单的方法可以通过指定日期范围内的日志文件来获取主 rdd? (即从 2017/01/20 到 2017/02/14)

我是新手,如果我在任何步骤中有错误,请纠正我。

【问题讨论】:

另外,如果我想在加入所有这些日志(比如 DF1)后基于“类型”列进行过滤。这样做的最佳过程是什么? (我通常使用 DF1.filter())。还有其他有效的方法吗? sqlContext.read.schema(schema).json("/mypath/2017/02/[13-14]‌​")) 不起作用。它说“非法文件模式:索引 4 附近的非法字符范围” 【参考方案1】:

如果您坚持使用 sqlContext,那么一个简单的解决方案就是定义一个方法,该方法将列出输入目录中的所有文件

case class FileWithDate(basePath: String, year: Int, month: Int, day: Int) 
 def path = s"$basePath/$year/$month/$day"


def listFileSources() : List[FileWithDate] = ??? // implement here

如果您想合并来源中的所有数据框,您可以这样做:

// create an empty dataframe with the strucutre for the json
val files = listSources()
val allDFs = files.foldLeft(emptyDF)case (df, f) => df.union(sqlContext.read.schema(schema).json(f.path))

如果您想按日期过滤输入文件,那么这很容易。像这样的

files.filter(_.year == 2016 && (_.month >=2 || _.month <=3))

另一种解决方案是使用年、月、日来扩充您的数据框(放置额外的列),并在新数据框上执行所有业务逻辑

【讨论】:

我认为用最少的 Scala 知识你将能够实现 listFileSources 方法。您应该做的是获取 mypath 文件夹中的所有文件(递归迭代子文件夹)并创建 FileWithDate 类型的对象。这些对象被附加到将由方法返回的列表中。

以上是关于在 Spark 中组合来自多个目录的日志的主要内容,如果未能解决你的问题,请参考以下文章

Logstash部署 kibana部署 ELK组合收集多个日志

Logstash部署 kibana部署 ELK组合收集多个日志

spark日志收集

Spark设置日志级别

ELK 处理来自多个 docker 镜像的多行日志

3.8 Spark 用户日志分析