flume与kafka结合上传文件到HDFS上

Posted han-guang-xue

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume与kafka结合上传文件到HDFS上相关的知识,希望对你有一定的参考价值。

 实现如图的的效果(详细步骤请参考官方文档(http://flume.apache.org/FlumeUserGuide.html),flume更新版本比较快)

技术分享图片

flume1.conf的配置文件内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1

#具体定义source  
a1.sources.r1.type = spooldir
#先创建此目录,保证里面空的  
a1.sources.r1.spoolDir = /logs 

#sink到kafka里面
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的Topic
a1.sinks.k1.kafka.topic = haha1
#设置Kafka的broker地址和端口号
a1.sinks.k1.kafka.bootstrap.servers = zhiyou01:9092,zhiyou02:9092,zhiyou03:9092
#配置批量提交的数量
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type= snappy

#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

flume2.conf的配置文件内容

参考官方文档截图如下

技术分享图片

 

 

详细配置如下

a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置描述 监听avro
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize=5000
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092
a1.sources.r1.kafka.topics=test

#定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#对于sink的配置描述 传递到hdfs上面
a1.sinks.k1.type = hdfs
#集群的nameservers名字
#单节点的直接写:hdfs://han01/xxx
a1.sinks.k1.hdfs.path = hdfs://ns/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a1.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a1.sinks.k1.hdfs.rollInterval = 60


#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

以上是关于flume与kafka结合上传文件到HDFS上的主要内容,如果未能解决你的问题,请参考以下文章

kafka怎么收集到flume的日志

双层flume,中间件kafka,采集到hdfs并按日期分文件夹

关于从kafka采集数据到flume,然后落盘到hdfs上生成的一堆小文件的总结

使用flume sink hdfs小文件优化以及HDFS小文件问题分析和解决

Flume 需要时间将文件上传到 HDFS

实战系列Flume + kafka + HDFS构建日志采集系统