Flume到MongoDB的日志行为收集

Posted Java大咖说

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume到MongoDB的日志行为收集相关的知识,希望对你有一定的参考价值。

需求概述

某公司需要对玩家行为日志进行统一收集,收集后全部存入MongoDB,同时部分写入Kafka,对接SparkSteaming 做实时计算处理。他们使用的日志收集框架完全自己手动管理 效率低 需要一款可以实现高可用 分布式横向扩展的日志收集框架 这时候Flume来了~


概述

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

Flume主要由3个重要的组件购成:

  • Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。

  • Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。

  • Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。


我们实现Java写日志到Flume,Flume最终把日志写到MongoDB。目前官方Sink并没有提供,需要自己手动实现Sink

http://flume.apache.org/FlumeUserGuide.html#custom-sink


环境

  • Flume-1.8

  • MongoDB-4.0.3

  • JDK1.8

线上使用需在flume/lib下添加一下几个jar包

  • mongodb-driver-3.8.2.jar

  • mongodb-driver-core-3.8.2.jar

  • bson-3.8.2.jar


配置说明

Property Name

Default

Description

hostNames

-

host1:port1,host2,port2,...the mongodb host and port

database

-

the mongodb database

collection

-

the collection of database

user

-

the username of databse

password

-

the password of database

batchSize

100

the batchSize of sources

authentication_enabled

False

Whether u need a password and a user

如果没有密码和用户名 就不需要user password authentication_enabled T

如果有密码,设置authentication_enabled =True


代码实现


https://gitee.com/czy006/MongoDBSink


配置文件


MongoDBFlume.conf

#定义组件名称

a1.sources = r

a1.sinks = s

a1.channels = c


#定义数据入口

a1.sources.r.channels = c

a1.sources.r.type = netcat

a1.sources.r.bind = localhost

a1.sources.r.port = 44444


##定义拦截器

#a1.sources.r.interceptors=i1

#a1.sources.r.interceptors.i1.type=regex_filter

#a1.sources.r.interceptors.i1.regex= ERROR

  

# 定义数据出口 

a1.sinks.s.type = com.gzczy.MongodbSink.MongoSinkSelf

a1.sinks.s.hostNames=192.168.2.99:20000

a1.sinks.s.authentication_enabled=False

a1.sinks.s.database = gslog_test

#a1.sinks.s.password = 

#a1.sinks.s.user     = 

a1.sinks.s.collection = logsearch_info

a1.sinks.s.batchSize = 3

a1.sinks.s.channel = c

  

# 使用内存管道  

a1.channels.c.type = memory

a1.channels.c.capacity = 10000

a1.channels.c.transactionCapacity = 100


启动Flume:

[root@master apache-flume-1.8.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/MongoDBFlume.conf --name a1 -Dflume.root.logger=info,console


启动telnet:

telnet localhost 44444


在telent模拟一条数据:

{"_sys":"gslog","loginId":101725,"_topic":"loginIn","date":"2018-11-06","rmb":9999,"_lv":1,"lv":1,"uuid":2569021610400000017,"_server":"999","_job":4,"vip":999,"money":999999,"_tag":"","exp":0,"time":"2018-11-06 16:11:392"}


检查数据库是否插入成功



后续还需要2个功能

  • 插入指定的集合,无需配置 根据传入的数据(JSON)进行动态插入指定集合(collection)

  • 实现mongodb连接池


如果觉得适用记得给个小小的start~~

以上是关于Flume到MongoDB的日志行为收集的主要内容,如果未能解决你的问题,请参考以下文章

Flume日志收集

Flume 实战,将多台机器日志直接收集到 Kafka

Flume分布式日志收集系统

日志收集系统Flume及其应用

flume收集tomcat日志保存到本地目录

日志系统之Flume日志收集