flume对接kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume对接kafka相关的知识,希望对你有一定的参考价值。

参考技术A 一、flume配置 
1、kafka.conf    
#name

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

#channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = first

a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

#a1.sinks.k1.kafka.producer.compression.type = snappy

#bing

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

二、日志采集范例

1、日志目录->kafka集群 
#每个组件命名

a1.sources = r1

a1.channels = c1

#配置source

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*

a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json

#source拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.ln.gmall.flume.interceptor.LogInterceptor$Builder

#配置channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092

a1.channels.c1.kafka.topic = topic_log

a1.channels.c1.parseAsFlumeEvent = false

#绑定sink与channel和source与channel关系

a1.sources.r1.channels = c1

2、kafka集群到hdfs 
kafka source -> file channel ->hdfs sink

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#source

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sources.r1.kafka.topics = topic_log

a1.sources.r1.batchSize = 5000

a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.ln.gmall.flume.interceptor.TimestampInterceptor$Builder

#channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1

a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1

a1.channels.c1.maxFileSize = 2146435071

a1.channels.c1.capacity = 1000000

a1.channels.c1.keep-alive = 6

#sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d

a1.sinks.k1.hdfs.filePrefix = log-

a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = lzop

#绑定

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

以上是关于flume对接kafka的主要内容,如果未能解决你的问题,请参考以下文章

图解Flume对接Kafka(附中文注释)

图解Flume对接Kafka(附中文注释)

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题