Flume学习系列----实战Spooling到HDFS

Posted Java不睡觉

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume学习系列----实战Spooling到HDFS相关的知识,希望对你有一定的参考价值。

前言:为了呼应上篇文章总结部分提出的点。这一篇文章决定来个实战Demo。需求就是使用Spooling Directory Source监听一个文件夹下面新增的文件,然后上传到HDFS中,这需要大家先安装好Hadoop。(ps:我的环境是伪分布式,我有个问题请教各位大佬,我namenode,datanode都能通过jps查看。而且也可以使用hadoop client,为什么localhost:50070还是访问不了。网上搜都是重新格式化namenode,但是还是不行。有知道的大佬请帮一下我,先谢)。


一、Spooling Directory Source介绍

    Spooling Directory Source通过监听某个目录下的新增文件,并将文件的内容读取出来,实现日志信息的收集。实际使用中会结合log4j进行使用。被传输结束的文件会修改后缀名,添加.completed后缀(可以自定义)。
    这里需要注意两点:

  • 放入监控目录后,文件不能被修改,否则抛异常;

  • 监控目录下不能有子目录;

二、log4j配置

    我使用log4j的DailyRollingFileAppender去每分钟生成一个日志到配置的目录下,代码如下:

#输出信息到文件
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
#这个是生成日志文件的目录及文件名
log4j.appender.file.File = /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log
log4j.appender.file.Append = true#每分钟产生一个日志文件
#当前的文件名是testlog.log,前面分钟产生的文件是这种命名形式testlog.log.2018-08-20-18-16。
log4j.appender.file.DatePattern = '.'yyyy-MM-dd-HH-mm
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = [%-5p]   %-d{yyyy-MM-dd HH:mm:ss}    %m%n

注意这里日志生成的目录不要和Spooling Directory监控的目录一样。因为产生日志时会不断Append到当前这一分钟的日志文件,我们刚刚还说Spooling Directory监控目录下的文件不能修改。那怎么办呢?


Flume学习系列(二)----实战Spooling到HDFS


  • 可以通过shell脚本定时把文件通过mv 或 cp命令弄到Spooling Directory的监控文件夹下。
        But,为了降低学习曲线,防止大家看到shell望而却步,我决定不采用这种方式。只是简单的先让程序生成一些文件,然后直接通过shell一次性把所有文件放进去。

三、Java生成日志

    日志的内容(不含log4j中的配置)为:
0a58f82b-ff6f-4feb-abe2-7c6ac9a0c24d####ERH####qhp####6677062
格式为:用户ID--县号--镇号--收入

public class Main {   
    public static void main(String[] args) throws Exception {        Thread thread = new Thread(new GenerateRecord());        thread.start();    } }
class GenerateRecord extends Thread {    private final Logger log = Logger.getLogger(GenerateRecord.class);    public void run() {        while (true) {            // 随机产生一个用户uuid            UUID userId = UUID.randomUUID();            System.out.println(userId.toString().length());            // 产生一个随机的用户总资产            int num = (int) (Math.random() * 10000000) + 100000;            // 产生一个随意的县名            StringBuilder sb = new StringBuilder();            for (int i = 0; i < 3; i++) {                char a = (char) (Math.random() * (90 - 65) + 65);                sb.append(a);            }            String county = sb.toString();            // 产生一个随机的镇名            StringBuilder sb1 = new StringBuilder();            for (int i = 0; i < 3; i++) {                char a = (char) (Math.random() * (122 - 97) + 97);                sb1.append(a);            }            String town = sb1.toString();            // 生成日志            log.info(userId + "####" + county + "####" + town + "####" + num);            // 停1秒钟            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    } }

在几分钟后停掉程序,在终端输入cd /Users/jsj/eclipse-workspace/log4j/src/main/java/ 查看生成的文件 ls -1 ,如下:

Flume学习系列(二)----实战Spooling到HDFS


四、Flume配置

    这部分是大家关注的重点。(不懂的配置可以去官网看)


Flume学习系列(二)----实战Spooling到HDFS


在flume安装目录的conf/flume.conf下加入如下代码:

# my application flume configuration
#agent2是我们给agent起的名字
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2

#Spooling Directory
#set source2
#设置type为spooldir,这个值是flume给定的alias
agent2.sources.source2.type=spooldir
#设置监控目录,注意和前面log4j的目录不同
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs

agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = false#set sink2
agent2.sinks.sink2.type=hdfs
agent2.sinks.sink2.hdfs.path=hdfs://localhost:9000/flumeagent2.sinks.sink2.hdfs.fileType=DataStream
agent2.sinks.sink2.hdfs.writeFormat=TEXT
agent2.sinks.sink2.hdfs.rollInterval=60agent2.sinks.sink2.channel=channel2
#设置存储到HDFS后文件的前缀
agent2.sinks.sink2.hdfs.filePrefix=%Y-%m-%d

#set channel2
#设置内存通道
agent2.channels.channel2.type=memory
agent2.channels.channel2.capacity=10000agent2.channels.channel2.transactionCapacity=1000agent2.channels.channel2.keep-alive=30

    可以,现在启动flume-ng。首先进到 bin 目录下,输入./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console -n agent2
    启动成功如下图:

Flume学习系列(二)----实战Spooling到HDFS


接下来开另一个终端,将刚才生成的日志文件拷贝到Spooling Directory监控目录下。输入如下命令:
cp /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log* /Users/jsj/eclipse-workspace/logs
此时flume的终端会嗖嗖嗖的刷日志,我截下来几条,主要是打开文件,对正在处理的文件改名为.tmp后缀,上传到HDFS后把HDFS上文件的.tmp删掉,本地的监控目录下文件加.COMPLETED后缀。

Flume学习系列(二)----实战Spooling到HDFS



Flume学习系列(二)----实战Spooling到HDFS


这时候我们去HDFS上检查一下:新开个终端输入hadoop fs -ls /flume。发现生成了比我们文件数多的多的文件,原来只有11个,现在有62个文件。

Flume学习系列(二)----实战Spooling到HDFS


为什么呢?你是不是


Flume学习系列(二)----实战Spooling到HDFS

    

有问题去官网看文档啊。发现在HDFS SINK下有这样一个配置。



    意思是说,这个属性你不配默认是10条记录一个文件。那我们看下HDFS上的文件是不是10条记录。输入hadoop fs -cat /flume/2018-08-20.1534766919112查看下文件内容,没错就是10条。


    这下真相大白了。这又体现了看官方文档的重要性。剩下的事情就交给你们了。


五、总结

    本文接Flume学习系列(一),实现了一个具体的Demo,练习了flume的配置文件的编写,以及查找问题的方法。


以上是关于Flume学习系列----实战Spooling到HDFS的主要内容,如果未能解决你的问题,请参考以下文章

实战系列Flume + kafka + HDFS构建日志采集系统

Flume 理论

flume

Flume学习系列---- Channel Selector与Sink Processors

Flume从入门到实战

Flume从入门到实战