Kafka Source 的 Flume 不使用 file_roll 写入事件

Posted

技术标签:

【中文标题】Kafka Source 的 Flume 不使用 file_roll 写入事件【英文标题】:Flume with Kafka Source not writing events using file_roll 【发布时间】:2022-01-23 14:03:06 【问题描述】:

因此,正如标题所述,我有一个带有 kafka 源的水槽代理,它写入一个 HDFS 位置,压缩为 avro,我想多路复用它以将事件写入日志文件。 我在 AKS 内的 pod 中运行我的水槽。

这就是我到目前为止所尝试的,我的水槽配置的这一部分:

flumeagent.sources = kafkaSource
flumeagent.sources.kafkaSource.channels = kafkaChannel logChannel
flumeagent.sources.kafkaSource.selector.type = multiplexing
flumeagent.sources.kafkaSource.selector.default = kafkaChannel logChannel
flumeagent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
flumeagent.sources.kafkaSource.kafka.consumer.security.protocol = SSL
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.sources.kafkaSource.batchSize  = 5000
flumeagent.sources.kafkaSource.topics  = <TOPICS>
flumeagent.sources.kafkaSource.consumer.group.id  = <GROUP_ID>

flumeagent.channels = kafkaChannel logChannel
flumeagent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
flumeagent.channels.kafkaChannel.kafka.producer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.keep-alive = 120
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.heartbeat.interval.ms = 10000
flumeagent.channels.kafkaChannel.kafka.producer.buffer.memory = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.request.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.max.partition.fetch.bytes = 50000000
flumeagent.channels.kafkaChannel.transactionCapacity = 1000
flumeagent.channels.kafkaChannel.capacity = 50000
flumeagent.channels.kafkaChannel.kafka.topic = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.consumer.group.id = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.bootstrap.servers = <SERVERS>

flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10

flumeagent.sinks = hdfsSink logSink
flumeagent.sinks.hdfsSink.channel = kafkaChannel
flumeagent.sinks.hdfsSink.type = hdfs
flumeagent.sinks.hdfsSink.hdfs.fileType = DataStream
flumeagent.sinks.hdfsSink.serializer = avro_event
flumeagent.sinks.hdfsSink.serializer.compressionCodec = snappy
flumeagent.sinks.hdfsSink.hdfs.fileSuffix = .avro
flumeagent.sinks.hdfsSink.hdfs.batchSize = 10
flumeagent.sinks.hdfsSink.hdfs.rollSize = 0
flumeagent.sinks.hdfsSink.hdfs.rollCount = 0
flumeagent.sinks.hdfsSink.hdfs.callTimeout = 60000 
flumeagent.sinks.hdfsSink.hdfs.cleanPreviousTemps = true
flumeagent.sinks.hdfsSink.hdfs.inUsePrefix = .
flumeagent.sinks.hdfsSink.hdfs.rollInterval = 3600
flumeagent.sinks.hdfsSink.hdfs.maxPathCountToScan = 2
flumeagent.sinks.hdfsSink.hdfs.timeZone = <TIME_ZONE>
flumeagent.sinks.hdfsSink.hdfs.path =<HDFS PATH>

flumeagent.sinks.logsink.channel = logChannel
flumeagent.sinks.logsink.type = file_roll
flumeagent.sinks.logsink.sink.batchSize = 1 
flumeagent.sinks.logsink.sink.directory = /var/log
flumeagent.sinks.logsink.sink.rollInterval = 0
flumeagent.sinks.logsink.sink.pathManager.prefix = monitor
flumeagent.sinks.logsink.sink.pathManager.extension = txt

为了将它部署到 AKS,我使用

创建了一个配置映射
kubectl create configmap <name>.properties --from-file = flume.conf = <agent-name>.conf

申请我使用

kubectl apply -f flume.yml

我得到的是,水槽成功写入 HDFS 位置,file_roll 接收器在 /var/log 中创建文件,但它没有向文件写入数据。

【问题讨论】:

【参考方案1】:

有效的方法是使用 jdbc 通道更改内存通道。

替换这个

flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10

flumeagent.channels.logChannel.type = jdbc

【讨论】:

以上是关于Kafka Source 的 Flume 不使用 file_roll 写入事件的主要内容,如果未能解决你的问题,请参考以下文章

flume 自定义 hbase sink

flume与kafka集成配置

flume从Kafka消费数据到HDFS

重磅:Flume1-7结合kafka讲解

flume从kafka读取数据到hdfs中的配置

flume kafka和sparkstreaming整合