FluentD 将日志从 kafka 转发到另一个 fluentD
Posted
技术标签:
【中文标题】FluentD 将日志从 kafka 转发到另一个 fluentD【英文标题】:FluentD forward logs from kafka to another fluentD 【发布时间】:2021-09-16 20:44:48 【问题描述】:我需要将我的应用程序日志发送到作为 EFK 服务一部分的 FluentD。所以我尝试配置另一个 FluentD 来做到这一点。
my-fluent.conf:
<source>
@type kafka_group
consumer_group cgrp
brokers "#ENV['KAFKA_BROKERS']"
scram_mechanism sha512
username "#ENV['KAFKA_USERNAME']"
password "#ENV['KAFKA_PASSWORD']"
ssl_ca_certs_from_system true
topics "#ENV['KAFKA_TOPICS']"
format json
</source>
<filter TOPIC>
@type parser
key_name log
reserve_data false
<parse>
@type json
</parse>
</filter>
<match TOPIC>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host "#ENV['FLUENTD_HOST']"
port "#ENV['FLUENTD_PORT']"
shared_key "#ENV['FLUENTD_SHARED_KEY']"
</server>
</store>
</match>
我能够正确看到stdout
的输出
2021-07-06 07:36:54.376459650 +0000 主题:"foo":"bar", ...
但我无法看到来自 kibana 的日志。跟踪后我发现第二个 fluentd 在接收数据时抛出错误:
"time":"2021-07-05 11:21:41 +0000","level":"error","message":"读取数据时出现意外错误 host="XXXX" port=58548 error_class=MessagePack::MalformedFormatError 错误="无效字节"","worker_id":0 "time":"2021-07-05 11:21:41 +0000","level":"error","worker_id":0,"message":"/usr/lib/ruby/gems/2.7. 0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:in
feed_each'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:in
block (2 级) in read_messages'\n/usr/lib/ruby/gems/2.7.0/gems /fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:271:inblock in read_messages'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/server.rb:613:in
on_read_without_connection'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/ lib/cool.io/io.rb:123:inon_readable'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in
on_readable'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop .rb:88:inrun_once'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in
run'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/event_loop.rb:93:inblock in start'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/thread.rb:78:in
阻塞在thread_create'"
【问题讨论】:
能否请您添加第二个 fluentd 实例的source
部分?
@Azeem 添加了第二个 fluentd 的整个配置图。希望有帮助
如果您只能添加导致此问题的配置,那将很有帮助。您可以与 helm 的 --dry-run
标志共享它。只需复制 configmap 的输出并在此处添加即可。谢谢!
【参考方案1】:
问题是在第一个 fluentd 中缺少安全标签。
<match TOPIC>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host "#ENV['FLUENTD_HOST']"
port "#ENV['FLUENTD_PORT']"
shared_key "#ENV['FLUENTD_SHARED_KEY']"
</server>
<security>
self_hostname HOSTNAME
shared_key "#ENV['FLUENTD_SHARED_KEY']"
</security>
</store>
</match>
【讨论】:
以上是关于FluentD 将日志从 kafka 转发到另一个 fluentD的主要内容,如果未能解决你的问题,请参考以下文章
ELK系列~Nxlog日志收集加转发(解决log4日志换行导致json转换失败问题)
Docker通过EFK(Elasticsearch + Fluentd + Kibana)查询日志