flume 监控目录操作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume 监控目录操作相关的知识,希望对你有一定的参考价值。

  • flume 监控目录操作

一:flume 监控目录操作

文件需求:

监控某个目录,若目录下面产生成符合条件的文件,flume 就抽取它到hdfs 上,目录 下可能有多中文件,比如当文件以log.tmp 结尾表示正在写,对log.tmp 文件设置size 值,就会变成一个以.log 结尾,则已经是完整文件(往往存在短暂),flume 可以抽取其中的数据,以log.completed 结尾则表示flume已经抽取完成,可以删除。

1.1 创建目录,用于提取数据

mkdir /home/hadoop/datas/spooling
mkdir /home/hadoop/datas/checkpoint
mkdir /home/hadoop/datas/data

1.2 在hdfs 目录上创建存放抽取的数据

hdfs dfs -mkdir /spool

1.3 准备数据,三个以上,两种类型

cd /home/hadoop/datas/spooling/

touch xx.log
touch yy.log
touch zz.log.tmp 

1.4 准备agent 配置文件

cp -p hive-conf.properties test-dir.properties

vim test-dir.properties

# example.conf: A single-node Flume configuration

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir 
a3.sources.r3.spoolDir = /home/hadoop/datas/spooling
a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
# Describe the sink
a3.sinks.k3.type = hdfs  

a3.sinks.k3.hdfs.path = hdfs://namenode01.hadoop.com:8020/spool/%Y%m/%d
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10

# 设置二级目录按小时切割
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour

# 设置文件回滚条件
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollsize = 128000000
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = file 
a3.channels.c3.checkpointDir = /home/hadoop/datas/checkpoint

a3.channels.c3.dataDirs = /home/hadoop/datas/data

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3 

1.5 执行收集命令:

bin/flume-ng agent --conf conf --conf-file conf/test-dir.properties --name a3 

创建文件测试。

以上是关于flume 监控目录操作的主要内容,如果未能解决你的问题,请参考以下文章

Flume实时监控目录sink到hdfs

Flume

Flume

Flume

flume 监控hive日志文件

flume监控分析