FluentD 将日志从 kafka 转发到另一个 fluentD

Posted

技术标签:

【中文标题】FluentD 将日志从 kafka 转发到另一个 fluentD【英文标题】:FluentD forward logs from kafka to another fluentD 【发布时间】:2021-09-16 20:44:48 【问题描述】:

我需要将我的应用程序日志发送到作为 EFK 服务一部分的 FluentD。所以我尝试配置另一个 FluentD 来做到这一点。

my-fluent.conf:

<source>
  @type kafka_group
  consumer_group cgrp
  brokers "#ENV['KAFKA_BROKERS']"
  scram_mechanism sha512
  username "#ENV['KAFKA_USERNAME']"
  password "#ENV['KAFKA_PASSWORD']"
  ssl_ca_certs_from_system true
  topics "#ENV['KAFKA_TOPICS']"
  format json
</source>
<filter TOPIC>
  @type parser
  key_name log 
  reserve_data false
  <parse>
    @type json
  </parse>
</filter>
<match TOPIC>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type forward
    <server>
      host "#ENV['FLUENTD_HOST']"
      port "#ENV['FLUENTD_PORT']"
      shared_key "#ENV['FLUENTD_SHARED_KEY']"
    </server>
  </store>
</match>

我能够正确看到stdout 的输出

2021-07-06 07:36:54.376459650 +0000 主题:"foo":"bar", ...

但我无法看到来自 kibana 的日志。跟踪后我发现第二个 fluentd 在接收数据时抛出错误:

"time":"2021-07-05 11:21:41 +0000","level":"error","message":"读取数据时出现意外错误 host="XXXX" port=58548 error_class=MessagePack::MalformedFormatError 错误="无效字节"","worker_id":0 "time":"2021-07-05 11:21:41 +0000","level":"error","worker_id":0,"message":"/usr/lib/ruby/gems/2.7. 0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:in feed_each'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:in block (2 级) in read_messages'\n/usr/lib/ruby/gems/2.7.0/gems /fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:271:in block in read_messages'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/server.rb:613:in on_read_without_connection'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/ lib/cool.io/io.rb:123:in on_readable'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in on_readable'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop .rb:88:in run_once'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in run'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/event_loop.rb:93:in block in start'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/thread.rb:78:in 阻塞在thread_create'"

【问题讨论】:

能否请您添加第二个 fluentd 实例的 source 部分? @Azeem 添加了第二个 fluentd 的整个配置图。希望有帮助 如果您只能添加导致此问题的配置,那将很有帮助。您可以与 helm 的 --dry-run 标志共享它。只需复制 configmap 的输出并在此处添加即可。谢谢! 【参考方案1】:

问题是在第一个 fluentd 中缺少安全标签。

<match TOPIC>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type forward
    <server>
      host "#ENV['FLUENTD_HOST']"
      port "#ENV['FLUENTD_PORT']"
      shared_key "#ENV['FLUENTD_SHARED_KEY']"
    </server>
    <security>
      self_hostname HOSTNAME
      shared_key "#ENV['FLUENTD_SHARED_KEY']"
    </security>
  </store>
</match>

【讨论】:

以上是关于FluentD 将日志从 kafka 转发到另一个 fluentD的主要内容,如果未能解决你的问题,请参考以下文章

ELK系列~Nxlog日志收集加转发(解决log4日志换行导致json转换失败问题)

Docker通过EFK(Elasticsearch + Fluentd + Kibana)查询日志

Fluentd:将 syslog 事件转发到 sensu

Kubernetes 事件日志到 elasticsearch

filebeat工作原理

ELK系列~log4-nxlog-Fluentd-elasticsearch写json数据需要注意的几点