ELK+kafka+filebeat搭建生产ELFK集群 --markdown 语法

Posted cnsre运维博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK+kafka+filebeat搭建生产ELFK集群 --markdown 语法相关的知识,希望对你有一定的参考价值。


ELK+kafka+filebeat搭建生产ELFK集群

ELK 架构介绍

集群服务版本

服务 版本
java 1.8.0_221
elasticsearch 7.10.1
filebeat 7.10.1
kibana 7.10.1
logstash 7.10.1
cerebro 0.9.2-1
kafka 2.12-2.3.0
zookeeper 3.5.6

服务器环境说明

IP地址 主机名 配置 角色
10.0.11.172 elk-master 4C16G es-master、kafka+zookeeper1
10.0.21.117 elk-node1 4C16G es-node1、kafka+zookeeper2
10.0.11.208 elk-node2 4C16G es-node2、kafka+zookeeper3
10.0.10.242 elk-kibana 4C16G logstash、kibana、cerebro

系统参数优化

{{< notice warning "注意" >}}
三个节点都需要执行
{{< /notice >}}

修改主机名

hostnamectl set-hostname elk-master
hostnamectl set-hostname elk-node1
hostnamectl set-hostname elk-node2

增加文件描述符

cat >>/etc/security/limits.conf<< EOF
*               soft      nofile          65536
*               hard      nofile          65536
*               soft      nproc           65536
*               hard      nproc           65536
*               hard      memlock         unlimited
*               soft      memlock         unlimited
EOF

修改默认限制内存

cat >>/etc/systemd/system.conf<< EOF
DefaultLimitNOFILE=65536
DefaultLimitNPROC=32000
DefaultLimitMEMLOCK=infinity
EOF

优化内核,对es支持

cat >>/etc/sysctl.conf<< EOF
# 关闭交换内存
vm.swappiness =0
# 影响java线程数量,建议修改为262144或者更高
vm.max_map_count= 262144
# 优化内核listen连接
net.core.somaxconn=65535
# 最大打开文件描述符数,建议修改为655360或者更高
fs.file-max=655360
# 开启ipv4转发
net.ipv4.ip_forward= 1
EOF

修改Hostname配置文件

cat >>/etc/hosts<< EOF
elk-master  10.0.11.172
elk-node1   10.0.21.117
elk-node2   10.0.11.208
EOF

重启使配置生效

reboot

部署Zookeeper

{{< notice warning "注意" >}}
三个节点都需要执行
{{< /notice >}}

创建Zookeeper项目目录

#存放快照日志
mkdir zkdata 
#存放事物日志
mkdir zklogs 

下载解压zookeeper

wget  http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz 
mv apache-zookeeper-3.5.6-bin zookeeper

修改配置文件

[root@elk-master zookeeper]# cat conf/zoo.cfg  |grep  -v ^#
# 服务器之间或客户端与服务器之间维持心跳的时间间隔
# tickTime以毫秒为单位。
tickTime=2000 
# 集群中的follower服务器(F)与leader服务器(L)之间的初始连接心跳数
initLimit=10
# 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数
syncLimit=5
# 数据保存目录
dataDir=../zkdata
# 日志保存目录
dataLogDir=../zklogs
# 客户端连接端口
clientPort=2181
# 客户端最大连接数。# 根据自己实际情况设置,默认为60个
maxClientCnxns=60
# 三个接点配置,格式为: server.服务编号=服务地址、LF通信端口、选举端口
server.1=10.0.11.172:2888:3888
server.2=10.0.21.117:2888:3888
server.3=10.0.11.208:2888:3888

写入节点标记

{{< notice warning "注意" >}}
分别在三个节点/home/tools/zookeeper/zkdata/myid写入节点标记
{{< /notice >}}

{{< tabs master节点 node1节点 node2节点 >}}
{{< tab >}}

master的操作
echo "1" > /home/tools/zookeeper/zkdata/myid

{{< /tab >}}
{{< tab >}}

node1的操作
echo "2" > /home/tools/zookeeper/zkdata/myid

{{< /tab >}}
{{< tab >}}

node2的操作
echo "3" > /home/tools/zookeeper/zkdata/myid

{{< /tab >}}
{{< /tabs >}}

启动zookeeper集群

[root@elk-master zookeeper]# cd /home/tools/zookeeper/bin/
[root@elk-master bin]# ./zkServer.sh start 
ZooKeeper JMX enabled by default
Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

检查集群状态

[root@elk-master bin]# sh /home/tools/zookeeper/bin/zkServer.sh status 
ZooKeeper JMX enabled by default
Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

设置全局变量

cat >>/etc/profile<< EOF
export ZOOKEEPER_INSTALL=/home/tools/zookeeper/
export PATH=$PATH:$ZOOKEEPER_INSTALL/bin
export PATH
EOF
  • 使配置生效
    source /etc/profile

    这样就可以全局使用zkServer.sh命令了

    部署 Kafka

    {{< notice warning "注意" >}}
    三个节点都需要执行
    {{< /notice >}}

    下载解压kafka压缩包

    [root@elk-master tools]# mkdir kafka
    [root@elk-master tools]# cd kafka/
    [root@elk-master kafka]# wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
    [root@elk-master kafka]# tar xf kafka_2.12-2.3.0.tgz 
    [root@elk-master kafka]# mv kafka_2.12-2.3.0 kafka
    [root@elk-master kafka]# cd kafka/config/

配置kafka

[root@elk-master config]# cat /home/tools/kafka/kafka/config/server.properties
############################# Server Basics ############################# 
# broker的id,值为整数,且必须唯一,在一个集群中不能重复
broker.id=1

############################# Socket Server Se:ttings ############################# 
# kafka默认监听的端口为9092 (默认与主机名进行连接)
listeners=PLAINTEXT://:9092

# 处理网络请求的线程数量,默认为3个
num.network.threads=3

# 执行磁盘IO操作的线程数量,默认为8个 
num.io.threads=8

# socket服务发送数据的缓冲区大小,默认100KB
socket.send.buffer.bytes=102400

# socket服务接受数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400

# socket服务所能接受的一个请求的最大大小,默认为100M
socket.request.max.bytes=104857600

############################# Log Basics ############################# 
# kafka存储消息数据的目录
log.dirs=../kfkdata

# 每个topic默认的partition数量
num.partitions=3

# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy ############################# 

# 消息刷新到磁盘中的消息条数阈值
#log.flush.interval.messages=10000

# 消息刷新到磁盘中的最大时间间隔,1s
#log.flush.interval.ms=1000

############################# Log Retention Policy ############################# 

# 日志保留小时数,超时会自动删除,默认为7天
log.retention.hours=168

# 日志保留大小,超出大小会自动删除,默认为1G
#log.retention.bytes=1073741824

# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.segment.bytes=1073741824

# 每隔多长时间检测数据是否达到删除条件,300s
log.retention.check.interval.ms=300000

############################# Zookeeper ############################# 
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
zookeeper.connect=10.0.11.172,10.0.21.117,10.0.11.208

# 连接zookeeper的超时时间,6s
zookeeper.connection.timeout.ms=6000

创建数据存储的目录

[root@elk-master config]# mkdir ../kfkdata

修改broker.id

{{< notice warning "注意" >}}
分别在三个节点依次修改/home/tools/kafka/kafka/config/server.properties配置文件
{{< /notice >}}
{{< tabs master节点 node1节点 node2节点 >}}
{{< tab >}}

master的配置
broker.id=1

{{< /tab >}}
{{< tab >}}

node1的配置
broker.id=2

{{< /tab >}}
{{< tab >}}

node2的配置
broker.id=3

{{< /tab >}}
{{< /tabs >}}

启动kafka集群

cd /home/tools/kafka/kafka/bin/
#启动测试
./kafka-server-start.sh ../config/server.properties
#放入后台
./kafka-server-start.sh -daemon ../config/server.properties

测试

{{< notice warning "注意" >}}
任意节点均可执行
{{< /notice >}}
在创建topic在集群中的任意节点 发布消息订阅消息验证结果

{{< tabs 创建topic 消息发布 topic消息订阅 >}}
{{< tab >}}

[root@elk-master bin]# ./kafka-topics.sh \\
--create \\
--zookeeper 10.0.11.172:2181,10.0.21.117:2181,10.0.11.208:2181 \\
--partitions 3 \\
--replication-factor 1 \\
--topic logs

{{< /tab >}}
{{< tab >}}

[root@elk-master bin]# ./kafka-console-producer.sh \\
--broker-list 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \\
--topic logs

{{< /tab >}}
{{< tab >}}

[root@elk-master bin]#  ./kafka-console-consumer.sh \\
--bootstrap-server 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \\
--topic logs \\
--from-beginning

{{< /tab >}}
{{< /tabs >}}

部署elasticsearch

{{< notice warning "注意" >}}
三个节点都需要执行
{{< /notice >}}

下载安装elasticsearch

wget https://pan.cnsre.cn/d/Package/Linux/ELK/elasticsearch-7.10.1-x86_64.rpm
[root@elk-master package]#  rpm -ivh elasticsearch-7.10.1-x86_64.rpm 

备份配置文件

cd /etc/elasticsearch
cp elasticsearch.yml  elasticsearch.yml.bak

修改配置文件

cat >/etc/elasticsearch/elasticsearch.yml << EOF
#集群名
cluster.name: elk-cluster

#node名
node.name: elk-1

#数据存储路径
path.data: /home/elasticsearch/esdata

#数据快照路径
path.repo: /home/backup/essnapshot

#日志存储路径
path.logs: /home/elasticsearch/eslogs

#es绑定的ip地址,根据自己机器ip进行修改
network.host: 0.0.0.0

#服务端口
http.port: 9200

#集群master需要和node名设置一致
discovery.seed_hosts: ["10.0.11.172", "10.0.21.117", "10.0.11.208"]
cluster.initial_master_nodes: ["10.0.11.172","10.0.21.117","10.0.11.208"]

#允许跨域请求
http.cors.enabled: true
#* 表示支持所有域名
http.cors.allow-origin: "*"
#添加请求header
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type

#生产必须为true,内存锁定检查,目的是内存地址直接映射,减少一次copy时间
bootstrap.memory_lock: true
#系统过滤检查,防止数据损坏,考虑集群安全,生产设置成false
bootstrap.system_call_filter: false

#xpack配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12
EOF

修改JVM

  • jvm.options文件中22-23行的8g设置为你的服务内存的一半
    [root@elk-node1 elasticsearch]# cat -n  jvm.options |grep 8g
    22  -Xms8g
    23  -Xmx8g

    修改其他节点配置

    {{< notice warning "注意" >}}
    分别在三个节点修改/etc/elasticsearch/elasticsearch.yml配置文件
    {{< /notice >}}

{{< tabs master节点 node1节点 node2节点 >}}
{{< tab >}}

master的配置
node.name: "es-master"

{{< /tab >}}
{{< tab >}}

node1的配置
node.name: "es-node1"

{{< /tab >}}
{{< tab >}}

node2的配置
node.name: "es-node2"

{{< /tab >}}
{{< /tabs >}}
最终展示

分配权限

因为自定义数据、日志存储目录,所以要把权限给到目录

mkdir  -p /home/elasticsearch/{esdata,eslogs}
chown  elasticsearch:elasticsearch  /home/elasticsearch/*
mkdir  -p /home/backup/essnapshot
chown elasticsearch:elasticsearch /home/backup/essnapshot

启动服务

三个节点全部启动并加入开机启动

systemctl start elasticsearch
systemctl enable  elasticsearch

使用xpack进行安全认证

xpack的安全功能

  • TLS 功能。 可对通信进行加密
  • 文件和原生 Realm。 可用于创建和管理用户
  • 基于角色的访问控制。 可用于控制用户对集群 API 和索引的访问权限
  • 通过针对 Kibana Spaces 的安全功能,还可允许在Kibana 中实现多租户。
    在我配置过程中,发现集群认证需要首先配置秘钥才行,否则在给内置用户创建秘钥的时候将会报错。
    {{< notice warning "error" >}}
    Cause: Cluster state has not been recovered yet, cannot write to the [null] index
    {{< /notice >}}
    
    Unexpected response code [503] from calling PUT http://10.0.11.172:9200/_security/user/apm_system/_password?pretty
    Cause: Cluster state has not been recovered yet, cannot write to the [null] index

Possible next steps:

  • Try running this tool again.
  • Try running with the --verbose parameter for additional messages.
  • Check the elasticsearch logs for additional error details.
  • Use the change password API manually.

ERROR: Failed to set password for user [apm_system].


#### 申请证书
{{< notice warning "注意" >}}  
下面的操作,在其中一个节点操作即可
{{< /notice >}}
``` shell
/usr/share/elasticsearch/bin/elasticsearch-certutil ca
/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12

两条命令均一路回车即可,不需要给秘钥再添加密码
证书创建完成之后,默认在es的数据目录。
将证书拷贝到etc下,并给上权限。

[root@elk-master ~]# ls /usr/share/elasticsearch/elastic-*
/usr/share/elasticsearch/elastic-certificates.p12
/usr/share/elasticsearch/elastic-stack-ca.p12
cp /usr/share/elasticsearch/elastic-* /etc/elasticsearch/
chown elasticsearch.elasticsearch /etc/elasticsearch/elastic*

做完之后,将证书拷贝到其他节点

为内置账号添加密码

ES中内置了几个管理其他集成组件的账号apm_system, beats_system, elastic, kibana, logstash_system, remote_monitoring_user使用之前,首先需要设置下密码。

/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y
Enter password for [elastic]:
Reenter password for [elastic]:
Enter password for [apm_system]:
Reenter password for [apm_system]:
Enter password for [kibana]:
Reenter password for [kibana]:
Enter password for [logstash_system]:
Reenter password for [logstash_system]:
Enter password for [beats_system]:
Reenter password for [beats_system]:
Enter password for [remote_monitoring_user]:
Reenter password for [remote_monitoring_user]:
Changed password for user [apm_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]

部署 Cerebro

下载安装

wget https://pan.cnsre.cn/d/Package/Linux/ELK/cerebro-0.9.2-1.noarch.rpm
rpm -ivh cerebro-0.9.2-1.noarch.rpm 

修改配置文件

修改/etc/cerebro/application.conf配置文件
找到对应配置修改为以下内容
{{< codes 修改内容一 修改内容二>}}
{{<code>}}

data.path: "/var/lib/cerebro/cerebro.db"
#data.path = "./cerebro.db"

{{</code>}}
{{<code>}}

hosts = [
  #{
  #  host = "http://localhost:9200"
  #  name = "Localhost cluster"
  #  headers-whitelist = [ "x-proxy-user", "x-proxy-roles", "X-Forwarded-For" ]
  #}
  # Example of host with authentication
  {
    host = "http://10.0.11.172:9200"
    name = "elk-cluster"
    auth = {
      username = "elastic"
      password = "123"
    }
  }
]

{{</code>}}
{{</codes>}}

报错

{{< notice warning "error" >}}
cerebro[8073]: No java installations was detected.
{{< /notice >}}
启动服务后报错No java,但是我的环境是有JAVA的。也做了全局变量
感觉很奇怪...

解决方法

{{< notice success "解决方法" >}}
在启动服务文件中加入JAVA_HOME
{{< /notice >}}

  • 找到服务启动文件/usr/share/cerebro/bin/cerebro
  • 修改/usr/share/cerebro/bin/cerebro中的JAVA_HOME
    具体如下,根据自己的JAVA_HOME填写路径
    {{< codes 修改前 修改后>}}
    {{<code>}}
    if [[ -n "$bundled_jvm" ]];  then
    echo "$bundled_jvm/bin/java"
    elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]];  then
    echo "$JAVA_HOME/bin/java"
    else
    echo "java"
    fi

    {{</code>}}
    {{<code>}}

    if [[ -n "$bundled_jvm" ]];  then
    echo "$bundled_jvm/bin/java"
    elif [[ -n "/home/tools/jdk1.8.0_221" ]] && [[ -x "/home/tools/jdk1.8.0_221/bin/java" ]];  then
    echo "/home/tools/jdk1.8.0_221/bin/java"
    else
    echo "java"
    fi

    {{</code>}}
    {{</codes>}}

    启动服务

    systemctl  start cerebro.service
    systemctl  enable cerebro.service
    systemctl  status  cerebro.service

    可以看到监听的是9000端口
    访问试下

    部署Kibana

    下载安装

    https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-x86_64.rpm
    rpm -ivh kibana-7.10.1-x86_64.rpm

    修改备份配置文件

  • 备份配置文件
    cd /etc/kibana/
    mv kibana.yml kibana.yml.bak
  • 修改配置文件
    vim kibana.yml
    server.port: 5601
    server.host: 0.0.0.0
    elasticsearch.hosts: ["http://10.0.11.172:9200/","http://10.0.21.117:9200/","http://10.0.11.208:9200/"]
    elasticsearch.username: "elastic"
    elasticsearch.password: "123"
    i18n.locale: "zh-CN"

    启动服务器

    systemctl  start   kibana.service
    systemctl  enable  kibana.service
    systemctl  status  kibana.service

    访问WEB

    访问http://IP:5601

    部署filebeat

    wget https://pan.cnsre.cn/d/Package/Linux/ELK/filebeat-7.10.1-x86_64.rpm
    rpm -ivh filebeat-7.10.1-x86_64.rpm 
    cd /etc/filebeat/
    cp filebeat.yml filebeat.yml.bak

    修改配置文件

    修改filebeat配置文件,把日志推送到kafka

    
    #=========================== Filebeat inputs =============================
    max_procs: 1                     #限制filebeat的进程数量,其实就是内核数,避免过多抢占业务资源
    queue.mem.events: 256            # 存储于内存队列的事件数,排队发送 (默认4096)
    queue.mem.flush.min_events: 128  # 小于 queue.mem.events ,增加此值可提高吞吐量 (默认值2048)
    filebeat.inputs:                 # inputs为复数,表名type可以有多个
  • type: log # 输入类型
    enable: true # 启用这个type配置
    paths:

    • /home/homeconnect/logs/AspectLog/aspect.log # 监控tomcat 的业务日志
      json.keys_under_root: true #默认Flase,还会将json解析的日志存储至messages字段
      json.overwrite_keys: true #覆盖默认的key,使用自定义json格式的key
      max_bytes: 20480 # 单条日志的大小限制,建议限制(默认为10M,queue.mem.events * max_bytes 将是占有内存的一部)
      fields: # 额外的字段
      source: test-prod-tomcat-aspect-a # 自定义source字段,用于es建议索引(字段名小写,我记得大写好像不行)
  • type: log # 输入类型
    enable: true # 启用这个type配置
    paths:
    • /home/tools/apache-tomcat-8.5.23/logs/localhost_access_log..log # 监控tomcat access日志
      json.keys_under_root: true #默认Flase,还会将json解析的日志存储至messages字段
      json.overwrite_keys: true #覆盖默认的key,使用自定义json格式的key
      max_bytes: 20480 # 单条日志的大小限制,建议限制(默认为10M,queue.mem.events
      max_bytes 将是占有内存的一部分)
      fields: # 额外的字段
      source: test-prod-tomcat-access-a # 自定义source字段,用于es建议索引

自定义es的索引需要把ilm设置为false

setup.ilm.enabled: false

#=============================== output ===============================
output.kafka: # 输出到kafka
enabled: true # 该output配置是否启用
hosts: ["10.0.11.172:9092","10.0.21.117:9092","10.0.11.208:9092"] # kafka节点列表
topic: logstash-%{[fields.source]} # kafka会创建该topic,然后logstash(可以过滤修改)会传给es作为索引名称
partition.hash:
reachable_only: true # 是否只发往可达分区
compression: gzip # 压缩
max_message_bytes: 1000000 # Event最大字节数。默认1000000。应小于等于kafka broker message.max.bytes值
required_acks: 1 # kafka ack等级
worker: 1 # kafka output的最大并发数
bulk_max_size: 2048 # 单次发往kafka的最大事件数
logging.to_files: true # 输出所有日志到file,默认true, 达到日志文件大小限制时,日志文件会自动限制替换

#=============================== other ===============================
close_older: 30m # 如果文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h
force_close_files: false # 这个选项关闭一个文件,当文件名称的变化。只在window建议为true

没有新日志采集后多长时间关闭文件句柄,默认5分钟,设置成1分钟,加快文件句柄关闭

close_inactive: 1m

传输了3h后荏没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point

close_timeout: 3h

这个配置项也应该配置上,默认值是0表示不清理,不清理的意思是采集过的文件描述在registry文件里永不清理,在运行一段时间后,registry会变大,可能会带来问题

clean_inactive: 72h

设置了clean_inactive后就需要设置ignore_older,且要保证ignore_older < clean_inactive

ignore_older: 70h

### 启动服务
``` shell
systemctl  start  filebeat.service
systemctl  enable  filebeat.service
systemctl  status  filebeat.service

部署logstash

下载安装

wget https://pan.cnsre.cn/d/Package/Linux/ELK/logstash-7.10.1-x86_64.rpm
rpm -ivh logstash-7.10.1-x86_64.rpm
mv logstash.yml logstash.yml.bak

修改配置文件

修改logstash.yml

vim logstash.yml
http.host: "0.0.0.0"
# 指发送到Elasticsearch的批量请求的大小,值越大,处理则通常更高效,但增加了内存开销
pipeline.batch.size: 3000
# 指调整Logstash管道的延迟,过了该时间则logstash开始执行过滤器和输出
pipeline.batch.delay: 200

修改配置文件,从kafka获取日志

[root@elk-kibana conf.d]# cat /etc/logstash/conf.d/get-kafka-logs.conf 
input {                                   # 输入组件
  kafka {                                  # 从kafka消费数据
    bootstrap_servers => ["10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092"]
    codec => "json"                        # 数据格式
    #topics => ["3in1-topi"]                   # 使用kafka传过来的topic
    topics_pattern => "logstash-.*"             # 使用正则匹配topic
    consumer_threads => 3                  # 消费线程数量
    decorate_events => true                # 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向logstash事件中添加一个名为kafka的字段
    auto_offset_reset => "latest"          # 自动重置偏移量到最新的偏移量
    #group_id => "logstash-node"            # 消费组ID,多个有相同group_id的logstash实例为一个消费组
    #client_id => "logstash1"               # 客户端ID
    fetch_max_wait_ms => "1000"            # 指当没有足够的数据立即满足fetch_min_bytes时,服务器在回答fetch请求之前将阻塞的最长时间
  }
}

filter{
   # 当非业务字段时,无traceId则移除
   #if ([message] =~ "traceId=null") {          # 过滤组件,这里只是展示,无实际意义,根据自己的业务需求进行过滤
   #   drop {}
   #}
mutate {
    convert => ["Request time", "float"]
    }
        if [ip] != "-" {
        geoip {
                       source => "ip"
                        target => "geoip"
                       # database => "/usr/share/GeoIP/GeoIPCity.dat"
                        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                }
                   mutate {
                        convert => [ "[geoip][coordinates]", "float"]
                }
        }
 }

output {           # 输出组件
  elasticsearch {   # Logstash输出到es
    hosts => ["10.0.11.172:9200","10.0.21.117:9200","10.0.11.208:9200"]
    index => "logstash-%{[fields][source]}-%{+YYYY-MM-dd}"  # 直接在日志中匹配
    #index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}"  # 以日期建索引
    user => "elastic"
    password => "123"
  }
  #stdout {
  #    codec => rubydebug
 #}
}

测试接收日志

测试是否能接收到数据

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/get-kafka-logs.conf

下边把logstash设置为使用systemd启动
修改/etc/systemd/system/logstash.service文件

[Unit]
Description=root

[Service]
Type=simple
User=root
Group=root
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with - makes it try to load, but if the file doesnt
# exist, it continues onward.
EnvironmentFile=-/etc/default/logstash
EnvironmentFile=-/etc/sysconfig/logstash
ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"
Restart=alway
WorkingDirectort=/
Nice=19
LimitNOFILE=16384

[Install]
WantedBy=multi-user.target

在启动程序/usr/share/logstash/bin/logstash.lib.sh中加入JAVA_HOME
在文件86行左右的if [ -z "$JAVACMD" ]; then代码上方插入一行JAVACMD="/home/tools/jdk1.8.0_221/bin/java" 具体的路径需要你根据自己的JAVA来修改。

[root@elk-kibana ~]# cat  -n /usr/share/logstash/bin/logstash.lib.sh |grep  JAVACMD
    85    # set the path to java into JAVACMD which will be picked up by JRuby to launch itself
    86    JAVACMD="/home/tools/jdk1.8.0_221/bin/java"
    87    if [ -z "$JAVACMD" ]; then

启动服务

systemctl reload logstash.service
systemctl restart  logstash.service
systemctl enable  logstash.service

最后检查

登录kibana创建索引

选择管理--索引模式--创建索引模式br/>![SRE运维博客|Linux系统运维|自动化运维|云计算|运维监控](https://s4.51cto.com/images/blog/202112/02165234_61a88952755dd97742.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)
![SRE运维博客|Linux系统运维|自动化运维|云计算|运维监控](https://s4.51cto.com/images/blog/202112/02165234_61a88952b19d746194.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)
输入`索引名称`--`下一步`
![SRE运维博客|Linux系统运维|自动化运维|云计算|运维监控](https://s4.51cto.com/images/blog/202112/02165235_61a889530786264049.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=)
选择`@timestamp`--`创建索引模式`

然后就可以看到日志了


以上是关于ELK+kafka+filebeat搭建生产ELFK集群 --markdown 语法的主要内容,如果未能解决你的问题,请参考以下文章

Kafka架构及Filebeat+Kafka+ELK集群搭建

Kafka架构及Filebeat+Kafka+ELK集群搭建

ELK+Filebeat+Kafka分布式日志管理平台搭建

Kafka+Zookeeper+Filebeat+ELK 搭建日志收集系统

ELK+Filebeat+Kafka+Zookeeper构建大数据日志分析平台三

ELK日志系统设计方案-Filebeat日志收集推送Kafka