flume对接kafka
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume对接kafka相关的知识,希望对你有一定的参考价值。
参考技术A 一、flume配置1、kafka.conf
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#a1.sinks.k1.kafka.producer.compression.type = snappy
#bing
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
二、日志采集范例
1、日志目录->kafka集群
#每个组件命名
a1.sources = r1
a1.channels = c1
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
#source拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.ln.gmall.flume.interceptor.LogInterceptor$Builder
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#绑定sink与channel和source与channel关系
a1.sources.r1.channels = c1
2、kafka集群到hdfs
kafka source -> file channel ->hdfs sink
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.ln.gmall.flume.interceptor.TimestampInterceptor$Builder
#channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
#sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
#绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
以上是关于flume对接kafka的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题
大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题