Kafka Source 的 Flume 不使用 file_roll 写入事件
Posted
技术标签:
【中文标题】Kafka Source 的 Flume 不使用 file_roll 写入事件【英文标题】:Flume with Kafka Source not writing events using file_roll 【发布时间】:2022-01-23 14:03:06 【问题描述】:因此,正如标题所述,我有一个带有 kafka 源的水槽代理,它写入一个 HDFS 位置,压缩为 avro,我想多路复用它以将事件写入日志文件。 我在 AKS 内的 pod 中运行我的水槽。
这就是我到目前为止所尝试的,我的水槽配置的这一部分:
flumeagent.sources = kafkaSource
flumeagent.sources.kafkaSource.channels = kafkaChannel logChannel
flumeagent.sources.kafkaSource.selector.type = multiplexing
flumeagent.sources.kafkaSource.selector.default = kafkaChannel logChannel
flumeagent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
flumeagent.sources.kafkaSource.kafka.consumer.security.protocol = SSL
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.sources.kafkaSource.batchSize = 5000
flumeagent.sources.kafkaSource.topics = <TOPICS>
flumeagent.sources.kafkaSource.consumer.group.id = <GROUP_ID>
flumeagent.channels = kafkaChannel logChannel
flumeagent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
flumeagent.channels.kafkaChannel.kafka.producer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.keep-alive = 120
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.heartbeat.interval.ms = 10000
flumeagent.channels.kafkaChannel.kafka.producer.buffer.memory = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.request.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.max.partition.fetch.bytes = 50000000
flumeagent.channels.kafkaChannel.transactionCapacity = 1000
flumeagent.channels.kafkaChannel.capacity = 50000
flumeagent.channels.kafkaChannel.kafka.topic = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.consumer.group.id = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.bootstrap.servers = <SERVERS>
flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10
flumeagent.sinks = hdfsSink logSink
flumeagent.sinks.hdfsSink.channel = kafkaChannel
flumeagent.sinks.hdfsSink.type = hdfs
flumeagent.sinks.hdfsSink.hdfs.fileType = DataStream
flumeagent.sinks.hdfsSink.serializer = avro_event
flumeagent.sinks.hdfsSink.serializer.compressionCodec = snappy
flumeagent.sinks.hdfsSink.hdfs.fileSuffix = .avro
flumeagent.sinks.hdfsSink.hdfs.batchSize = 10
flumeagent.sinks.hdfsSink.hdfs.rollSize = 0
flumeagent.sinks.hdfsSink.hdfs.rollCount = 0
flumeagent.sinks.hdfsSink.hdfs.callTimeout = 60000
flumeagent.sinks.hdfsSink.hdfs.cleanPreviousTemps = true
flumeagent.sinks.hdfsSink.hdfs.inUsePrefix = .
flumeagent.sinks.hdfsSink.hdfs.rollInterval = 3600
flumeagent.sinks.hdfsSink.hdfs.maxPathCountToScan = 2
flumeagent.sinks.hdfsSink.hdfs.timeZone = <TIME_ZONE>
flumeagent.sinks.hdfsSink.hdfs.path =<HDFS PATH>
flumeagent.sinks.logsink.channel = logChannel
flumeagent.sinks.logsink.type = file_roll
flumeagent.sinks.logsink.sink.batchSize = 1
flumeagent.sinks.logsink.sink.directory = /var/log
flumeagent.sinks.logsink.sink.rollInterval = 0
flumeagent.sinks.logsink.sink.pathManager.prefix = monitor
flumeagent.sinks.logsink.sink.pathManager.extension = txt
为了将它部署到 AKS,我使用
创建了一个配置映射kubectl create configmap <name>.properties --from-file = flume.conf = <agent-name>.conf
申请我使用
kubectl apply -f flume.yml
我得到的是,水槽成功写入 HDFS 位置,file_roll 接收器在 /var/log 中创建文件,但它没有向文件写入数据。
【问题讨论】:
【参考方案1】:有效的方法是使用 jdbc 通道更改内存通道。
替换这个
flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10
有
flumeagent.channels.logChannel.type = jdbc
【讨论】:
以上是关于Kafka Source 的 Flume 不使用 file_roll 写入事件的主要内容,如果未能解决你的问题,请参考以下文章