flume读取日志文件并存储到HDFS

Posted soy-technology

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume读取日志文件并存储到HDFS相关的知识,希望对你有一定的参考价值。

配置hadoop环境

配置flume环境

配置flume文件

D:Softapache-flume-1.8.0-binconf 

将 flume-conf.properties.template 重新命名为  hdfs.properties

# 组装 agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 配置source:从目录中读取文件
a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = E:log2s
# 包括所有日志文件
a1.sources.s1.includePattern=^.*$
# 忽略当前正在写入的日志文件
a1.sources.s1.ignorePattern=^.*log$
a1.sources.s1.deletePolicy=never
a1.sources.s1.fileHeader = true
## 增加时间header
a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type=timestamp

# 配置channel:缓存到文件中
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# 配置sink:保存到hdfs中
a1.sinks.k1.channel=c1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://127.0.0.1:9000/flume/accesslog/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=logs
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.batchSize=100
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.minBlockReplicas=1
flume启动命令

flume-ng agent --conf conf --conf-file ../conf/hdfs.properties --name a1


编写日志java程序

public class App 
{
  protected static final Logger logger = Logger.getLogger(App.class);


  public static void main( String[] args )
  {
    while (true) {
    logger.info("hello world:"+ String.valueOf(new Date().getTime()));
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      e.printStackTrace();
      }
    }
  }
}

 

 

log4j配置

### set log levels ###
log4j.rootLogger=INFO, stdout, file

### stdout ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

### file ###
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
# 日志路径
log4j.appender.file.file=E:/log2s/log.log
log4j.appender.file.Threshold=INFO
log4j.appender.file.Append=true
# 每分钟生成1个新文件
log4j.appender.file.DatePattern=‘.‘yyyy-MM-dd-HH-mm
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n


启动java程序生成日志

 技术图片

 

 

flume执行结果

07/24 17:19:27 INFO node.Application: Starting Channel c1
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
07/24 17:19:27 INFO node.Application: Starting Sink k1
07/24 17:19:27 INFO node.Application: Starting Source s1
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
07/24 17:19:27 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: E:log2s
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:log2slog.log.2018-07-24-16-46 to E:log2slog.log.2018-07-24-16-46.COMPLETED
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:log2slog.log.2018-07-24-16-47 to E:log2slog.log.2018-07-24-16-47.COMPLETED
07/24 17:19:28 INFO hdfs.HDFSSequenceFile: writeFormat = Text, UseRawLocalFileSystem = false
07/24 17:19:28 INFO hdfs.BucketWriter: Creating hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027.tmp
07/24 17:19:39 INFO hdfs.BucketWriter: Closing hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027.tmp
07/24 17:19:39 INFO hdfs.BucketWriter: Renaming hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027.tmp to hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027
07/24 17:19:39 INFO hdfs.HDFSEventSink: Writer callback called.
07/24 17:19:59 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:19:59 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:log2slog.log.2018-07-24-16-48 to E:log2slog.log.2018-07-24-16-48.COMPLETED
07/24 17:20:00 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:20:00 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:log2slog.log.2018-07-24-17-19 to E:log2slog.log.2018-07-24-17-19.COMPLETED
07/24 17:20:02 INFO hdfs.HDFSSequenceFile: writeFormat = Text, UseRawLocalFileSystem = false
07/24 17:20:02 INFO hdfs.BucketWriter: Creating hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903.tmp
07/24 17:20:13 INFO hdfs.BucketWriter: Closing hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903.tmp
07/24 17:20:13 INFO hdfs.BucketWriter: Renaming hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903.tmp to hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903
07/24 17:20:13 INFO hdfs.HDFSEventSink: Writer callback called.
07/24 17:21:00 INFO hdfs.HDFSSequenceFile: writeFormat = Text, UseRawLocalFileSystem = false
07/24 17:21:00 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:21:00 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:log2slog.log.2018-07-24-17-20 to E:log2slog.log.2018-07-24-17-20.COMPLETED
07/24 17:21:00 INFO hdfs.BucketWriter: Creating hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382.tmp
07/24 17:21:10 INFO hdfs.BucketWriter: Closing hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382.tmp
07/24 17:21:10 INFO hdfs.BucketWriter: Renaming hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382.tmp to hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382
07/24 17:21:10 INFO hdfs.HDFSEventSink: Writer callback called.


HDFS目录

技术图片

 

以上是关于flume读取日志文件并存储到HDFS的主要内容,如果未能解决你的问题,请参考以下文章

在写入时使用 Flume 将日志文件摄取到 HDFS

flume 监控hive日志文件

flume简介与监听文件目录并sink至hdfs实战

1.6 定义agent 读取日志存入hdfs

使用flume sink hdfs小文件优化以及HDFS小文件问题分析和解决

通过 Apache Flume 将日志文件从本地文件系统移动到 HDFS 时出错