Flume到MongoDB的日志行为收集
Posted Java大咖说
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume到MongoDB的日志行为收集相关的知识,希望对你有一定的参考价值。
需求概述
某公司需要对玩家行为日志进行统一收集,收集后全部存入MongoDB,同时部分写入Kafka,对接SparkSteaming 做实时计算处理。他们使用的日志收集框架完全自己手动管理 效率低 需要一款可以实现高可用 分布式横向扩展的日志收集框架 这时候Flume来了~
概述
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
Flume主要由3个重要的组件购成:
Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。
Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
我们实现Java写日志到Flume,Flume最终把日志写到MongoDB。目前官方Sink并没有提供,需要自己手动实现Sink
http://flume.apache.org/FlumeUserGuide.html#custom-sink
环境
Flume-1.8
MongoDB-4.0.3
JDK1.8
线上使用需在flume/lib下添加一下几个jar包
mongodb-driver-3.8.2.jar
mongodb-driver-core-3.8.2.jar
bson-3.8.2.jar
配置说明
Property Name |
Default |
Description |
hostNames |
- |
host1:port1,host2,port2,...the mongodb host and port |
database |
- |
the mongodb database |
collection |
- |
the collection of database |
user |
- |
the username of databse |
password |
- |
the password of database |
batchSize |
100 |
the batchSize of sources |
authentication_enabled |
False |
Whether u need a password and a user |
如果没有密码和用户名 就不需要user password authentication_enabled T
如果有密码,设置authentication_enabled =True
代码实现
https://gitee.com/czy006/MongoDBSink
配置文件
MongoDBFlume.conf
#定义组件名称
a1.sources = r
a1.sinks = s
a1.channels = c
#定义数据入口
a1.sources.r.channels = c
a1.sources.r.type = netcat
a1.sources.r.bind = localhost
a1.sources.r.port = 44444
##定义拦截器
#a1.sources.r.interceptors=i1
#a1.sources.r.interceptors.i1.type=regex_filter
#a1.sources.r.interceptors.i1.regex= ERROR
# 定义数据出口
a1.sinks.s.type = com.gzczy.MongodbSink.MongoSinkSelf
a1.sinks.s.hostNames=192.168.2.99:20000
a1.sinks.s.authentication_enabled=False
a1.sinks.s.database = gslog_test
#a1.sinks.s.password =
#a1.sinks.s.user =
a1.sinks.s.collection = logsearch_info
a1.sinks.s.batchSize = 3
a1.sinks.s.channel = c
# 使用内存管道
a1.channels.c.type = memory
a1.channels.c.capacity = 10000
a1.channels.c.transactionCapacity = 100
启动Flume:
[root@master apache-flume-1.8.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/MongoDBFlume.conf --name a1 -Dflume.root.logger=info,console
启动telnet:
telnet localhost 44444
在telent模拟一条数据:
{"_sys":"gslog","loginId":101725,"_topic":"loginIn","date":"2018-11-06","rmb":9999,"_lv":1,"lv":1,"uuid":2569021610400000017,"_server":"999","_job":4,"vip":999,"money":999999,"_tag":"","exp":0,"time":"2018-11-06 16:11:392"}
检查数据库是否插入成功
后续还需要2个功能
插入指定的集合,无需配置 根据传入的数据(JSON)进行动态插入指定集合(collection)
实现mongodb连接池
如果觉得适用记得给个小小的start~~
以上是关于Flume到MongoDB的日志行为收集的主要内容,如果未能解决你的问题,请参考以下文章