kafka怎么收集到flume的日志

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka怎么收集到flume的日志相关的知识,希望对你有一定的参考价值。

参考技术A 采集层 主要可以使用Flume, Kafka两种技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.
Kafka:Kafka是一个可持久化的分布式的消息队列。
Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

正如你们所知Flume内置很多的source和sink组件。然而,Kafka明显有一个更小的生产消费者生态系统,并且Kafka的社区支持不好。希望将来这种情况会得到改善,但是目前:使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。

Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。

Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
Flume和Kafka可以结合起来使用。通常会使用Flume + Kafka的方式。其实如果为了利用Flume已有的写HDFS功能,也可以使用Kafka + Flume的方式。

Flume到MongoDB的日志行为收集

需求概述

某公司需要对玩家行为日志进行统一收集,收集后全部存入MongoDB,同时部分写入Kafka,对接SparkSteaming 做实时计算处理。他们使用的日志收集框架完全自己手动管理 效率低 需要一款可以实现高可用 分布式横向扩展的日志收集框架 这时候Flume来了~


概述

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

Flume主要由3个重要的组件购成:

  • Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。

  • Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。

  • Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。


我们实现Java写日志到Flume,Flume最终把日志写到MongoDB。目前官方Sink并没有提供,需要自己手动实现Sink

http://flume.apache.org/FlumeUserGuide.html#custom-sink


环境

  • Flume-1.8

  • MongoDB-4.0.3

  • JDK1.8

线上使用需在flume/lib下添加一下几个jar包

  • mongodb-driver-3.8.2.jar

  • mongodb-driver-core-3.8.2.jar

  • bson-3.8.2.jar


配置说明

Property Name

Default

Description

hostNames

-

host1:port1,host2,port2,...the mongodb host and port

database

-

the mongodb database

collection

-

the collection of database

user

-

the username of databse

password

-

the password of database

batchSize

100

the batchSize of sources

authentication_enabled

False

Whether u need a password and a user

如果没有密码和用户名 就不需要user password authentication_enabled T

如果有密码,设置authentication_enabled =True


代码实现


https://gitee.com/czy006/MongoDBSink


配置文件


MongoDBFlume.conf

#定义组件名称

a1.sources = r

a1.sinks = s

a1.channels = c


#定义数据入口

a1.sources.r.channels = c

a1.sources.r.type = netcat

a1.sources.r.bind = localhost

a1.sources.r.port = 44444


##定义拦截器

#a1.sources.r.interceptors=i1

#a1.sources.r.interceptors.i1.type=regex_filter

#a1.sources.r.interceptors.i1.regex= ERROR

  

# 定义数据出口 

a1.sinks.s.type = com.gzczy.MongodbSink.MongoSinkSelf

a1.sinks.s.hostNames=192.168.2.99:20000

a1.sinks.s.authentication_enabled=False

a1.sinks.s.database = gslog_test

#a1.sinks.s.password = 

#a1.sinks.s.user     = 

a1.sinks.s.collection = logsearch_info

a1.sinks.s.batchSize = 3

a1.sinks.s.channel = c

  

# 使用内存管道  

a1.channels.c.type = memory

a1.channels.c.capacity = 10000

a1.channels.c.transactionCapacity = 100


启动Flume:

[root@master apache-flume-1.8.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/MongoDBFlume.conf --name a1 -Dflume.root.logger=info,console


启动telnet:

telnet localhost 44444


在telent模拟一条数据:

{"_sys":"gslog","loginId":101725,"_topic":"loginIn","date":"2018-11-06","rmb":9999,"_lv":1,"lv":1,"uuid":2569021610400000017,"_server":"999","_job":4,"vip":999,"money":999999,"_tag":"","exp":0,"time":"2018-11-06 16:11:392"}


检查数据库是否插入成功



后续还需要2个功能

  • 插入指定的集合,无需配置 根据传入的数据(JSON)进行动态插入指定集合(collection)

  • 实现mongodb连接池


如果觉得适用记得给个小小的start~~

以上是关于kafka怎么收集到flume的日志的主要内容,如果未能解决你的问题,请参考以下文章

linux-flume1.8收集nginx日志到kafka

Flume到MongoDB的日志行为收集

Flume+Kafka+SparkStreaming+Hbase+可视化

大数据篇:flume+kafka+spark stream+hbase做日志收集

大数据篇:flume+kafka+spark stream+hbase做日志收集

filebeat 多个日志输出到logstash后怎么区分