Flume案例——日志分析采集系统

Posted 码上攻城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume案例——日志分析采集系统相关的知识,希望对你有一定的参考价值。

概述

大数据平台每天处理业务系统产生的大量日志数据,一般而言,这些系统需要具有以下特征:

  1. 构建业务系统和日志分析系统的桥梁,并将它们之间的关联解耦;

  2. 支持近实时的在线分析系统和类似于 Hadoop 之类的离线分析系统;

  3. 具有高可扩展性。即:当数据量增加时,可以通过增加节点进行水平扩展。

日志分析采集系统模块

  • 数据采集模块:负责从各节点上实时采集数据,选用 Flume-NG 来实现。 Flume-NG 提供了丰富的 Source、Channel、Sink 实现,各种数据源的引入只要变更配置就可实现。

  • 数据接入模块:由于采集数据的速度和数据处理的速度不一定同步(类似于生产者消费者模式),因此添加一个消息中间件来作为缓冲,这里选用 Kafka 来实现。Kafka 适用于对数据管道的吞吐量、可用性要求都很高的解决方案,需要编程实现数据的生产和消费。

  • 流式计算模块:对采集到的数据进行实时分析,选用 Storm 来实现。

  • 数据输出模块:对分析后的结果持久化,可以使用 HDFS、mysql 等。

采用 Flume 作为数据的生产者,将生产的消息数据(日志数据、业务请求数据等)通过 Kafka Sink 发布到 Kafka 中。然后再由流式计算程序 Storm 做实时分析,这时就需要将在 Storm 的 Spout 中读取 Kafka 中的消息,然后交由具体的 Spot 组件去分析处理。同时 Storm 已经自带了一个集成 Kafka 的外部插件程序 storm-kafka。最后将计算结果持久化到 Hadoop 或 MySQL 中。

示例配置

Kafka 集群配置

  • 使用3台机器搭建 Kafka 集群,在每台机器上添加如下配置:

 
   
   
 
  1. # vim /etc/hosts

  2. 192.168.111.238 master

  3. 192.168.111.239 slave1

  4. 192.168.111.240 slave2

  • 在安装 Kafka 集群时,没有使用 Kafka 自带的 Zookeeper,而是独立安装了一个 Zookeeper 集群,也是使用这3台机器,并且保证 Zookeeper 集群正常运行。

  • 在 master 上准备 Kafka 安装文件,执行如下命令:

 
   
   
 
  1. cd /usr/local/

  2. tar xvzf kafka_2.11-1.0.0.tgz

  3. ln -s /usr/local/kafka_2.11-1.0.0 /usr/local/kafka

  • 修改配置文件 /usr/local/kafka/config/server.properties

 
   
   
 
  1. broker.id=0

  2. zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka

说明 :

默认 Kafka 会使用 ZooKeeper 默认的 / 路径,导致有关 Kafka 的 ZooKeeper 配置就会散落在根路径下面,如果有其他的应用也在使用此 ZooKeeper 集群,查看 ZooKeeper 中的数据会不直观,所以指定一个 chroot 路径,直接在 zookeeper.connect 配置项中指定

  • 需要手动在 ZooKeeper 中创建路径 /kafka,使用如下命令连接到任意一台 ZooKeeper 服务器:

 
   
   
 
  1. cd /usr/local/zookeeper

  2. bin/zkCli.sh

  3. create /kafka ''

  • 将配置好的安装文件同步到其他的 slave1、slave2 节点上:

 
   
   
 
  1. scp -r /usr/local/kafka_2.11-1.0.0/ slave1:/usr/local/

  2. scp -r /usr/local/kafka_2.11-1.0.0/ slave2:/usr/local/

  • 分别修改配置文件/usr/local/kafka/config/server.properties内容如下

 
   
   
 
  1. # 因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值

  2. broker.id=1  # 在slave1修改

  3. broker.id=2  # 在slave2修改

  • 在集群中的 master、slave1、slave2 这三个节点上分别启动 Kafka,分别执行如下命令:

 
   
   
 
  1. bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

  • 通过查看日志,或者检查进程状态,保证 Kafka 集群启动成功。

配置 Flume agent,将 Flume 与 Kafka 整合, 配置文件为 flume-conf.properties

 
   
   
 
  1. agent.sources = logsrc

  2. agent.channels = memchannel

  3. agent.sinks = kafkasink

  4. # configure the source

  5. agent.sources.logsrc.type = exec

  6. agent.sources.logsrc.command = tail -F /data/logs/component_role.log

  7. agent.sources.logsrc.shell = /bin/sh -c

  8. agent.sources.logsrc.batchSize = 50

  9. agent.sources.logsrc.channels = memchannel

  10. # configure the sink

  11. agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink

  12. agent.sinks.kafkasink.brokerList=master:9092, slave1:9092,slave2:9092

  13. agent.sinks.kafkasink.topic=mytopic

  14. agent.sinks.kafkasink.requiredAcks = 1

  15. agent.sinks.kafkasink.batchSize = 20

  16. agent.sinks.kafkasink.channel = memchannel

  17. # configure the channel

  18. agent.channels.memchannel.type = memory

  19. agent.channels.memchannel.capacity = 1000

启动该 Flume 节点

 
   
   
 
  1. cd /usr/local/apache-flume-1.7.0-bin

  2. ./bin/flume-ng agent -n agent -c ./conf -f ./conf/flume-conf.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=10100 -Dflume.root.logger=DEBUG,console

动态追加日志数据,模拟日志输出,进行测试

动态追加日志数据,执行命令向 /data/logs/component_role.log 添加数据:

 
   
   
 
  1. echo  "测试代码" >>  /data/logs/component_role.log

  2. echo  "检测Flume+Kafka数据管道通畅" >>  /data/logs/component_role.log

验证 Kafka 数据接收结果

 
   
   
 
  1. root@ubuntu238:/usr/local/kafka_2.11-1.0.0# ./bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181/kafka  --topic mytopic --from-beginning

  2. Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

  3. 测试代码

  4. 检测Flume+Kafka数据管道通畅

未完,后续增加流式计算及数据输出。



以上是关于Flume案例——日志分析采集系统的主要内容,如果未能解决你的问题,请参考以下文章

一种基于Flume的分布式日志采集分析系统

Flume基础:自定义 Interceptor

三峡大学-复杂数据预处理实训

日志采集框架Flume

Flume+Kafka双剑合璧玩转大数据平台日志采集

大数据运维之某银行大数据采集工具Flume性能优化案例