flume 监控目录操作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume 监控目录操作相关的知识,希望对你有一定的参考价值。
- flume 监控目录操作
一:flume 监控目录操作
文件需求:
监控某个目录,若目录下面产生成符合条件的文件,flume 就抽取它到hdfs 上,目录 下可能有多中文件,比如当文件以log.tmp 结尾表示正在写,对log.tmp 文件设置size 值,就会变成一个以.log 结尾,则已经是完整文件(往往存在短暂),flume 可以抽取其中的数据,以log.completed 结尾则表示flume已经抽取完成,可以删除。
1.1 创建目录,用于提取数据
mkdir /home/hadoop/datas/spooling
mkdir /home/hadoop/datas/checkpoint
mkdir /home/hadoop/datas/data
1.2 在hdfs 目录上创建存放抽取的数据
hdfs dfs -mkdir /spool
1.3 准备数据,三个以上,两种类型
cd /home/hadoop/datas/spooling/
touch xx.log
touch yy.log
touch zz.log.tmp
1.4 准备agent 配置文件
cp -p hive-conf.properties test-dir.properties
vim test-dir.properties
# example.conf: A single-node Flume configuration
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /home/hadoop/datas/spooling
a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://namenode01.hadoop.com:8020/spool/%Y%m/%d
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10
# 设置二级目录按小时切割
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
# 设置文件回滚条件
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollsize = 128000000
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = file
a3.channels.c3.checkpointDir = /home/hadoop/datas/checkpoint
a3.channels.c3.dataDirs = /home/hadoop/datas/data
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
1.5 执行收集命令:
bin/flume-ng agent --conf conf --conf-file conf/test-dir.properties --name a3
创建文件测试。
以上是关于flume 监控目录操作的主要内容,如果未能解决你的问题,请参考以下文章