Flume基础

Posted 杀智勇双全杀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume基础相关的知识,希望对你有一定的参考价值。

Sqoop基础

概述

实时大数据处理的第一步就是实时数据采集。Flume是一个实时数据流采集工具。可以从Flume官网获取更多信息。但是这货非常成熟,上次更新距今已有一段时间。

在这里插入图片描述
∵离线能做的事,实时一定有办法实现;而实时能做的事,离线完全无法实现,故实时后期一定会替代离线。

功能

数据采集

Flume可以将数据从读取位置采集到写入位置,类似复制(不是像Hive建立映射关系时直接把文件“搬走”的剪切,原始数据还在原位置)。Flume是将各种所需要处理的数据复制到大数据仓库中。

分布式实时数据流

可以将多个不同的数据源的数据实时采集到各种目标地中。

数据源:文件、网络端口。Flume默认不能直接从mysql这类普通数据库中读取文件(由于是基于Java开发的,Flume提供了各种接口,用户可以自定义开发,利用JDBC也可以读取MySQL这类RDBMS)。

目的地:HDFS、Hbase、Hive、Kafka。

特点

功能全面:所有的读取和写入的程序,都已经封装好了,用户只需要配置读取位置,写入位置,就可以实现采集。

允许自定义开发:如果功能不能满足实际的业务需求,Flume提供各种接口,允许自定义开发基于Java的应用程序。

开发简单:所有功能都封装好了,只要调用即可。用户只需要写一个配置文件:读取位置,读取内容,写入位置。

分布式:Flume本身并不是分布式程序,但是可以在每个节点都安装一个Flume,这样每个节点都可以并发采集,从而实现分布式的功能。

应用

应用于实时数据流采集的场景:基于文件/网络协议端口的数据流采集。

Flume的基本组成

Flume官网是这么解释的:
A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop).
在这里插入图片描述
A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol.When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel.

Agent

每个Agent就是一个Flume的程序,每个Agent由三个部分组成:source、channel、sink。

Source

负责读取数据。Source会动态监听数据源,将数据源新增的数据实时采集成Event(Java封装的数据类)数据流(每条数据会变成一条Event数据),将每个Event发送到Channel。

Channel

临时缓存数据。将source发送过来的event的数据缓存起来,供Sink取数据。

Sink

负责发送数据。Sink会主动从Channel中读取采集到的数据,并将数据写入目标地。

Event

用于构建每一条数据的对象。每一条数据会变成一个Event进行传递。

  • head:定义一些KV属性和配置,默认head是空的
  • body:数据就存在body中

伪代码表示:

Event{
	Map head;
	byte[] body;--每一条数据的字节流
}

Flume的开发套路

写Flume参数配置文件

在properties配置文件中要写入这些内容:

  • 定义agent
  • 定义source
  • 定义channel
  • 定义sink

其中:
定义agent要配置agent的名称、source、channel、sink。
定义source要配置从哪个途径的哪个目录以怎样的方式读取何种内容。
定义channel要配置写入目的地。

运行Flume的agent程序

CDH的1.6版本flume对应apache的1.7。

Flume没用到服务端。。。用的是flume-ng(老版本flume-og架构麻烦,性能差,已经淘汰)。

Usage: bin/flume-ng <command> [options]...
flume-ng agent --conf,-c <conf>  --conf-file,-f <file> --name,-n <name> 

其中:

  • agent:表示要运行一个Flume程序
  • –conf,-c :指定Flume的配置文件目录
  • –conf-file,-f :要运行哪个文件
  • –name,-n :运行的agent的名字是什么(一个程序文件中可以有多个agent程序,通过名字来区别

Flume测试

采集Hive的日志、临时缓存在内存中、将日志写入Flume的日志中并打印在命令行。

查看帮助手册

Hive官方用户指引手册这里可以看很多帮助:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
功能太多了,想记住还是很吃力的。。。但是实际经常用得到的貌似也就其中的一些。。。

Exec Source

执行一条Linux的命令来实现采集。Linux中可以搭配tail -f(默认显示10行,之后自动更新最新状态)。

memory channel

将数据缓存在内存中。

开发

在node3:

cd /export/server/flume-1.6.0-cdh5.14.0-bin
mkdir usercase
ll ./conf

看到:

[root@node3 flume-1.6.0-cdh5.14.0-bin]# ll ./conf
总用量 20
-rw-r--r-- 1 root root 1312 11月  8 19:32 core-site.xml
-rw-r--r-- 1 root root 1661 11月  8 19:28 flume-conf.properties.template
-rw-r--r-- 1 root root 1455 11月  8 19:28 flume-env.ps1.template
-rw-r--r-- 1 root root 1624 11月  8 19:28 flume-env.sh
-rw-r--r-- 1 root root 3107 11月  8 19:28 log4j.properties

这个flume-conf.properties.template就是示例配置文件。。。

cp conf/flume-conf.properties.template usercase/hive-mem-log.properties

查看一下:

[root@node3 flume-1.6.0-cdh5.14.0-bin]# cat usercase/hive-mem-log.properties
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

使用vim修改:

vim usercase/hive-mem-log.properties

使用dd删除多余的内容后i插入:

# Sources, channels and sinks are defined per a1, 

# in this case called 'a1'

#define the agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log

#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000

#define the sink
a1.sinks.k1.type = logger

#bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

在这里插入图片描述
保存后还是在node3:

flume-ng agent -c conf/ -f usercase/hive-mem-log.properties -n a1 -Dflume.root.logger=INFO,console
[root@node3 flume-1.6.0-cdh5.14.0-bin]# flume-ng agent -c conf/ -f usercase/hive-mem-log.properties -n a1 -Dflume.root.logger=INFO,console
Info: Sourcing environment configuration script /export/server/flume-1.6.0-cdh5.14.0-bin/conf/flume-env.sh
Info: Including Hadoop libraries found via (/export/server/hadoop-2.6.0-cdh5.14.0/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/export/server/hive-1.1.0-cdh5.14.0) for Hive access
+ exec /export/server/jdk1.8.0_241/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/export/server/flume-1.6.0-cdh5.14.0-bin/conf:/export/server/flume-1.6.0-cdh5.14.0-bin/lib/*:/export/server/hadoop-2.6.0-cdh5.14.0/etc/hadoop:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/common/lib/*:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/common/*:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/hdfs:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/hdfs/lib/*:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/hdfs/*:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/yarn/lib/*:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/yarn/*:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/lib/*:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/*:/export/server/hadoop-2.6.0-cdh5.14.0/contrib/capacity-scheduler/*.jar:/export/server/hive-1.1.0-cdh5.14.0/lib/*' -Djava.library.path=:/export/server/hadoop-2.6.0-cdh5.14.0/lib/native org.apache.flume.node.Application -f usercase/hive-mem-log.properties -n a1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/export/server/flume-1.6.0-cdh5.14.0-bin/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/export/server/hadoop-2.6.0-cdh5.14.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-05-07 20:55:04,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2021-05-07 20:55:04,495 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:usercase/hive-mem-log.properties
2021-05-07 20:55:04,500 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: a1
2021-05-07 20:55:04,500 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2021-05-07 20:55:04,501 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2021-05-07 20:55:04,520 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a1]
2021-05-07 20:55:04,520 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2021-05-07 20:55:04,527 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
2021-05-07 20:55:04,531 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1
2021-05-07 20:55:04,532 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source s1, type exec
2021-05-07 20:55:04,539 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2021-05-07 20:55:04,542 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [s1, k1]
2021-05-07 20:55:04,549 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4d5c851 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2021-05-07 20:55:04,558 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
2021-05-07 20:55:04,559 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:159)] Waiting for channel: c1 to start. Sleeping for 500 ms
2021-05-07 20:55:04,611 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2021-05-07 20:55:04,615 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2021-05-07 20:55:05,061 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2021-05-07 20:55:05,066 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1
2021-05-07 20:55:05,075 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:168)] Exec source starting with command: tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log
2021-05-07 20:55:05,082 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
2021-05-07 20:55:05,085 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: s1 started
2021-05-07 20:55:09,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 31 2D 30 35 2D 30 36 20 31 36 3A 35 39 2021-05-06 16:59 }
2021-05-07 20:55:09,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 31 2D 30 35 2D 30 36 20 31 36 3A 35 39 2021-05-06 16:59 }
2021-05-07 20:55:09,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 30 32 31 2D 30 35 2D 30 36 20 31 36 3A 35 39 2021-05-06 16:59 }
2021-05-07 20:55:09,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4D 61 70 52 65 64 75 63 65 20 54 6F 74 61 6C 20 MapReduce Total  }
2021-05-07 20:55:09,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 45 6E 64 65 64 20 4A 6F 62 20 3D 20 6A 6F 62 5F Ended Job = job_ }
2021-05-07 20:55:09,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4D 61 70 52 65 64 75 63 65 20 4A 6F 62 73 20 4C MapReduce Jobs L }
2021-05-07 20:55:09,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 53 74 61 67 65 2D 53 74 61 67 65 2D 31 3A 20 4D Stage-Stage-1: M }
2021-05-07 20:55:09,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 54 6F 74 61 6C 20 4D 61 70 52 65 64 75 63 65 20 Total MapReduce  }
2021-05-07 20:55:09,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4F 4B                                           OK }
2021-05-07 20:55:09,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 68 69 63 68 3A 20 6E 6F 20 68 62 61 73 65 20 which: no hbase  }

这句话中:
Dflume.root.logger=INFO,console:将flume的日志打印在命令行。

效果还不错哈。。。ctrl+c结束。。。

常用Source

Flume官方用户手册
在这里插入图片描述
可以找到所有Source的配置。

Exec

通过执行一条Linux命令来实现数据动态采集,固定搭配tail -f命令来使用。实现动态监听采集单个文件的数据。

Taildir

如果文件格式是这样:

/tomcat/logs/2020-01-01.log
             2020-01-02.log
             ……
             2020-11-10.log

∵Exec source只能监听单个文件,就无法再采集这个目录的数据。这种情况就需要动态监听多个文件。从Apache Flume1.7版本开始支持,动态监听采集多个文件(如果用的是1.5或者1.6,遇到这个问题,需要自己手动编译这个功能)。

元数据

这里有个json文件,这货就是元数据:

/export/server/flume-1.6.0-cdh5.14.0-bin/position/taildir_position.json

如果Flume程序故障,重启Flume程序,已经采集过的数据就不需要重新采集(∵会造成数据重复)。这个json会记录Flume所监听的每个文件已经被采集的位置。内容大概是这样:

[
{"inode":34599996,"pos":14,"file":"/export/data/flume/bigdata01.txt"},
{"inode":67595704,"pos":19,"file":"/export/data/flume/bigdata/test01.txt"},
{"inode":67805657,"pos":7,"file":"/export/data/flume/bigdata/test02.txt"}
]

常用Channel

file channel

将数据缓存在文件中,读写相对慢、容量大、安全性较高。适用于数据量大,读写性能要求不高的场景。

mem channel

将数据缓存在内存中,读写快、容量小、安全性较差。适用于小数据量的高性能的传输。

常用属性

capacity:缓存大小:指定Channel中最多存储多少条event。

transactionCapacity:每次传输的大小。限制每次source最多放多少个event和每次sink最多取多少个event。这个值一般为capacity的十分之一,最大不能超过capacity。

常用的Sink

常用的SINk:kafka SInk、HDFS SInk。

很多场景下,需要对数据提前做一步ETL,将ETL以后的结果再入库。Hive Sink有严格的要求,表必须为桶表,文件类型必须为orc。∴要用HDFS Sink代替Hive Sink。

将Flume采集的数据写入HDFS

Flume作为HDFS客户端,写入HDFS数据。

Flume写地址的时候,需要指定HDFS的绝对地址:

hdfs://node1:8020/nginx/log

然后手动将需要的jar包放入Flume的lib目录下。

另一种方式是:在Flume中配置Hadoop的环境变量,将core-site和hdfs-site放入Flume的配置文件目录。例如这样配置:

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per a1, 
# in this case called 'a1'


#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log 

#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000


#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test1


#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1

指定文件大小

这样配置,在node1的HDFS web中可以看到/flume/test1包含很多小文件(1KB左右),显然不适合HDFS存储(元数据量过大,比例失衡。浪费空间)。

hdfs.rollInterval	30			每隔多长时间产生一个文件,单位为s
hdfs.rollSize		1024		每个文件多大产生一个文件,字节
hdfs.rollCount		10			多少个event生成一个文件
如果不想使用某种规则,需要关闭,设置为0

例如可以这样配置:

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per a1, 
# in this case called 'a1'


#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log 

#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000


#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test1
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 10240
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0

#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1

在node1的HDFS web中可以看到/flume/test1的文件已经不止1KB了。。。

指定分区

添加时间标记目录即可实现分区存储。

例如可以这样配置:

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per a1, 
# in this case called 'a1'


#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log 

#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000


#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/log/daystr=%Y%m%d
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 10240
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true


#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1

其它配置

#指定生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = nginx
#指定生成的文件的后缀
a1.sinks.k1.hdfs.fileSuffix = .log
#指定写入HDFS的文件的类型:普通的文件
a1.sinks.k1.hdfs.fileType = DataStream 

以上是关于Flume基础的主要内容,如果未能解决你的问题,请参考以下文章

Flume框架基础

Flume基础

Flume基础:快速入门

flume学习一:flume基础知识

Flume学习之路 Flume的基础介绍

Flume基础:概述