flume从Kafka消费数据到HDFS

Posted zpan2019

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume从Kafka消费数据到HDFS相关的知识,希望对你有一定的参考价值。

#source的名字
agent.sources = kafkaSource
# channels的名字,建议按照type来命名
agent.channels = memoryChannel
# sink的名字,建议按照目标来命名
agent.sinks = hdfsSink

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = memoryChannel
# 指定sink需要使用的channel的名字,注意这里是channel
agent.sinks.hdfsSink.channel = memoryChannel

#-------- kafkaSource相关配置-----------------
# 定义消息源类型
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
#
# 这里特别注意: 是kafka的zookeeper的地址
#
agent.sources.kafkaSource.zookeeperConnect = 10.255.29.30:2181
# 配置消费的kafka topic
agent.sources.kafkaSource.topic = topic_start
# 配置消费者组的id,不能跟spark streaming中group id一样
agent.sources.kafkaSource.groupId = kafka2hdfs
# 消费超时时间,参照如下写法可以配置其他所有kafka的consumer选项。注意格式从kafka.xxx开始是consumer的配置属性
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100

#------- memoryChannel相关配置-------------------------
# channel类型
agent.channels.memoryChannel.type = memory
# channel存储的事件容量
agent.channels.memoryChannel.capacity=10000
# 事务容量
agent.channels.memoryChannel.transactionCapacity=1000

#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
# 注意, 我们输出到下面一个子文件夹datax中
agent.sinks.hdfsSink.hdfs.path = hdfs://app02:8020/tmp/mes/%Y%m%d%H
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
#为了防止小文件
agent.sinks.hdfsSink.hdfs.rollSize = 0
agent.sinks.hdfsSink.hdfs.rollCount = 0
#agent.sinks.hdfsSink.hdfs.rollInterval = 60

#配置前缀和后缀
agent.sinks.hdfsSink.hdfs.filePrefix=test
agent.sinks.hdfsSink.hdfs.fileSuffix=.data

#避免文件在关闭前使用临时文件
agent.sinks.hdfsSink.hdfs.inUserPrefix=_
agent.sinks.hdfsSink.hdfs.inUserSuffix=

#自定义拦截器
#agent.sources.kafkaSource.interceptors=i1
#agent.sources.kafkaSource.interceptors.i1.type=com.hadoop.flume.FormatInterceptor$Builder
#为了防止小文件
agent.sinks.hdfsSink.hdfs.rollSize = 0
agent.sinks.hdfsSink.hdfs.rollCount = 0
#agent.sinks.hdfsSink.hdfs.rollInterval = 60
但是只在一定程度上减少了小文件,跟kafak数据量大小有关

技术图片

所以需要将这些小文件进行归档。可以同时和spark streaming一起消费同一批数据

以上是关于flume从Kafka消费数据到HDFS的主要内容,如果未能解决你的问题,请参考以下文章

kafka怎么收集到flume的日志

Flume消费Kafka数据落盘至HDFS的实践

社区福利 | Kafka通过Flume传输数据到HBase

如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS

flume从kafka读取数据到hdfs中的配置

关于从kafka采集数据到flume,然后落盘到hdfs上生成的一堆小文件的总结