测试方法Flume+HDFS+Spark日志分析入门篇

Posted 非功能之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了测试方法Flume+HDFS+Spark日志分析入门篇相关的知识,希望对你有一定的参考价值。

IT基础建设的监控离不开日志分析,当系统由于某些BUG挂掉时,通过命令查询错误日志排除,也因此日志分析是经常做的事情。假若系统记录的日志很多,仍采取这种方式,很明显效率不高,那么大数据下的日志分析技术应运而生,到底是怎样的流程呢,今天小编就来介绍Flume+HDFS+Spark架构下的日志分析。

Flume+HDFS+Spark架构

应用日志的处理流程是这样的:先由应用系统产生日志,由其日志框架定义进行处理,如log4jlogback,将日志打印到控制台或输出到指定的文件中,当我们需要对日志文件进行查看时,可以使用vim等工具。

Flume+HDFS+Spark架构的日志处理流程是这样的:将日志采集组件Flume作为Agent部署在Application Server上,用于收集本地日志文件,并将日志转存到HDFS数据平台中,然后由Spark实现日志分析。

由于用到HDFS数据平台,因此需要安装Hadoop,关于FlumeHadoopSpark等的原理和特性,及安装不再详细叙述,本文只简述如何使用Flume+HDFS+Spark构建一套日志采集系统并进行日志分析。注:本文的运行环境为windows 7

Flume安装完毕后,需要修改相关配置文件,使之能够与完成日志采集。

  • log4j日志输出到Flume的日志文件中,配置如下:

log4j.appender.flume= org.apache.flume.clients.log4jappender.Log4jAppender

log4j.appender.flume.Hostname= localhost

log4j.appender.flume.Port= 41414

log4j.appender.flume.UnsafeMode= true

       二、将flume采集的日志输出到HDFS数据平台,需要修改flume.conf中的sink配置,如下:

agent1.sinks. type=hdfs

agent1.sinks. channel=channel1

agent1.sinks. hdfs.path=hdfs://localhost:9000/user/

agent1.sinks. hdfs.fileType=DataStream

agent1.sinks.hdfs.writeFormat=Text

agent1.sinks.hdfs.rollInterval=0

agent1.sinks.hdfs.rollSize=1024000

agent1.sinks.hdfs.rollCount=0

agent1.sinks.hdfs.idleTimeout=60

sinktype设置为hdfs,然后指定输出路径,这里指定输入到HDFS/user目录,将fileType配置为DataStream writeFormat配置为Text,可直接打开生成的文件进行查看。roll开头的参数用来控制滚动日志输出,rollsize按文件大小滚动rollSize=1024000,即1M大小滚动生成一个新的文件接收新的EVENTSidleTimeout设置为60秒,即60秒后如果文件没有被写入数据,则会关闭正在使用的.tmp文件,重命名去掉.tmp,一旦有新的events到达,则会新创建一个新的.tmp文件进行接收。

运行Application,即可在hdfs://localhost:9000/user目录下看到应用输出的日志信息。

Spark日志分析

Apache Spark是一个快速成长的开源集群计算系统,其生态系统构建在Spark内核引擎之上,使得Spark具备快速的内存计算能力,效率更高、运行速度更快。随着框架的日趋丰富,Spark具有越来越强大的功能和极好的易用性,使之能够进行高级数据分析,Spark生态系统如下:

 


       Streaming具备实时流数据的处理能力,Spark SQL语言查询结构化数据,DataFrame将数据保存为行的集合,可快速查询、绘制和过滤数据,MLibSpark的机器学习框架,GraphX是图计算框架,提供结构化数据的图计算能力。Spark支持ScalaJavaPythonR等四种语言的API编程。

Spark提供了交互式的shell命令行,目前支持ScalaPython两种,本文以Scala进行操作。

一、启动Spark shell

Scala 是一门现代的多范式编程语言,志在以简练、优雅及类型安全的方式来表达常用编程模式,它平滑地集成了面向对象和函数语言的特性,运行于Java平台(JVMJava虚拟机),并兼容现有的Java 程序Spark 的主要编程语言,优点是开发效率更高,代码更精简,并且可以通过Spark Shell进行交互式实时查询,方便排查问题。

bin/spark-shell启动Sparkshell

【测试方法】Flume+HDFS+Spark日志分析入门篇

成功后,如下图所示,出现”scala>”提示符:

【测试方法】Flume+HDFS+Spark日志分析入门篇


那么现在就可以开始日志分析了。

  • 基础操作

Spark的主要抽象是分布式的元素集合(distributedcollection of items),称为RDDResilientDistributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操作。RDD可以通过HDFS或任何支持Hadoop的文件系统创建,或者从其他RDD转化而来。

通过HDFS创建一个RDD,代码如下:

scala> val textFile =sc.textFile("hdfs://localhost:9000/user/file.txt")//val scala语法中声明常量的方式,sc是在进入sparkshell 时创建的sparkcontentspark上下文,此编码是将hdfs节点上的文件读取到内存中;Sparkshell读取 HDFS 中的文件,通过 "hdfs://" 前缀指定。

【测试方法】Flume+HDFS+Spark日志分析入门篇


注意:

1、如果使用本地文件系统路径,须保证所有的节点有相同的路径,或者使用共享路径。

2textFile还支持加载目录,即加载目录下所有文件,支持压缩文件、通配符等,textFile(hdfs://localhost:9000/user/)textFile(hdfs://localhost:9000/user/file.gz)textFile(hdfs://localhost:9000/user/*.txt)

3textFile可以控制文件分区的数量,即Spark为每个文件块创建一个分区,分区大小默认值是128M

RDD支持两种类型的操作,actions:在数据集上运行计算后返回值;transformations:转换,从现有数据集创建一个新的数据集。

count()first()属于actions,分别为统计行数和显示第一个Item

scala> textFile.count()//统计文本文件的总行数

【测试方法】Flume+HDFS+Spark日志分析入门篇

scala> textFile.first()//first读取文件的第一个Item,对于文本文件就是第一行内容

【测试方法】Flume+HDFS+Spark日志分析入门篇

Filter属于transformation,通过filter筛选出包括temp关键字的RDD,然后进行计数。当得到一个经过过滤操作后的RDD,可对其进行action操作使相应的数据流向应用程序。

scala>valcountwithguanli=textFile.filter(line=>line.contains(“temp”))

scala>countwithguanli.count()

【测试方法】Flume+HDFS+Spark日志分析入门篇     当然也可以使用actiontransformation链式操作的方式,使代码更为简洁:

textFile.filter(line=>line.contains("tmp")).count()//统计包含 tmp 的行数

【测试方法】Flume+HDFS+Spark日志分析入门篇

RDD actions transformations 可以用在更复杂的计算中,例如通过mapreduce可以找到包含单词最多的行。首先使用map方法将每行映射成一个数,创建一个新的RDD,然后使用reduce方法找出包含单词量最多的行。map()reduce() 中的参数是Scala的函数字面量(functionliterals,也称为闭包closures)。

textFile.map(line=> line.split(" ").size).reduce((a, b) => if (a > b) a elseb)//返回包含单词最多的行

【测试方法】Flume+HDFS+Spark日志分析入门篇


也可以引入其他Java包,如Math.max() 函数,使上述代码更容易理解,此时Math可以作为mapreduce的字面量参数。

scala>import java.lang.Math//引入java

scala textFile.map(line => line.split(" ").size).reduce((a,b)=>Math.mad(a,b))//返回包含单词最多的行

【测试方法】Flume+HDFS+Spark日志分析入门篇


使用Hadoop MapReduce数据流模式,同样可以实现以上功能。

scala>valwordCounts = textFile.flatMap(line => line.split(" ")).map(word=> (word, 1)).reduceByKey((a, b) => a + b) // 实现单词统计

 scala>wordCounts.collect()// 输出单词统计结果

【测试方法】Flume+HDFS+Spark日志分析入门篇


从上面可以看出,输入内容是无序的,Spark支持排序,通过sortByKey方法实现。经过排序,输出数据发生了变化。

scala>wordCounts.sortByKey()//排序

scala>sort.collect()//显示文本内容

Spark 支持在集群范围内将数据集缓存至每一个节点的内存中,可避免数据传输,当数据需要重复访问时此特征优势非常明显。调用cache(),就可以将数据集进行缓存。

scala>countwihtguanli.cache()

小结

以上是小编搭建的大数据日志分析架构,以及使用Spart shell提供的scala交互式编程进行日志分析的入门操作。使用RDD,既可以进行数据转换,也可以进行action操作,这意味着transformation可以改变数据格式、进行数据查询或数据过滤操作,action则可以触发数据改变、抽取数据、收集数据进行计算。

若要分析应用日志,需了解具体的应用需求,根据需求获取或过滤数据集,得到想要的结果,掌握RDD的基本操作后,可以通过更多的实例来熟练Spark,更多内容请参考Spark官网文档。


以上是关于测试方法Flume+HDFS+Spark日志分析入门篇的主要内容,如果未能解决你的问题,请参考以下文章

Spark

flume kafka spark读取日志延迟

使用Flume采集日志数据到HDFS中

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

Spark SQL实现日志离线批处理

Spark SQL实现日志离线批处理