Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch

Posted Elastic 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch相关的知识,希望对你有一定的参考价值。

在我之前的文章,我详细地介绍了如何通过 Filebeat 来收集日志并写入到 Elasticsearch。你可以阅读我之前的文章:

在今天的文章中,我将分享如何使用 Logstash 把日志文件发送到 Elasticsearch。使用 Logstash 的好处是它可以很方便地使用它丰富的过滤器对数据进行清洗以便更好地对数据进行分析。我们使用如下的架构:

 在今天的展示中,我将使用最新的 Elastic Stack 8.4.3 来进行展示。

安装

如果你还没有安装好自己的 Elasticsearch,Kibana 及 Logstash,你可以按照如下的文章来进行安装:

首先,我们参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群” 来生成 truststore.p12 证书文件:

$ pwd
/Users/liuxg/test/elasticsearch-8.4.3/config/certs
$ ls 
http.p12      http_ca.crt   transport.p12
$ keytool -import -file http_ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
Certificate was added to keystore
$ ls
http.p12       http_ca.crt    transport.p12  truststore.p12

在上面,我们生产的 truststore.p12 的密码为 password。

我们针对 Logstash 配置如下的配置文件:

logstash.conf

input 
  udp 
    port => 5959
    codec => json 
          target => "[document]"
    
  

output 
  stdout 
    codec => rubydebug
  

  elasticsearch 
      index => "logdb"
      hosts => ["https://192.168.0.3:9200"]
      user => "elastic"
      password => "6bTlJp388KkgJKWi+hQr"
      ssl_certificate_verification => true
      truststore => "/Users/liuxg/test/elasticsearch-8.4.3/config/certs/truststore.p12"
      truststore_password => "password"
  

在上面,我们需要根据自己的 Elasticsearch 账号及密码进行修改。另外你也需要根据自己的证书位置进行相应的调整。 上面的 hosts 是我的本地 Elasticsearch 集群的访问地址。你需要根据自己的进行配置。在上面,我们使用 udp input 来收集日志,并传入到 Elasticsearch。在本示例中,我们忽略了 filter 部分,以简化问题的描述。我们可以把这个 logstash.conf 置于 Logstash 的安装根目录中。

我们可以使用如下的命令来运行:

Python 日志应用

我们首先来安装一个叫做 python-logstash 的包:

 pip install python-logstash

我们设计如下的 Python 应用来通过 Logstash 写入日志:

app.py

import logging
import logstash
import sys


class Logging(object):
    def __init__(self, logger_name='python-logger',
                 log_stash_host='localhost',
                 log_stash_upd_port=5959

                 ):
        self.logger_name = logger_name
        self.log_stash_host = log_stash_host
        self.log_stash_upd_port = log_stash_upd_port


    def get(self):
        logging.basicConfig(
            filename="logfile",
            filemode="a",
            format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",
            datefmt="%H:%M:%S",
            level=logging.INFO,
        )

        self.stderrLogger = logging.StreamHandler()
        logging.getLogger().addHandler(self.stderrLogger)
        self.logger = logging.getLogger(self.logger_name)
        self.logger.addHandler(logstash.LogstashHandler(self.log_stash_host,
                                                        self.log_stash_upd_port,
                                                        version=1))
        return self.logger



instance = Logging(log_stash_upd_port=5959, log_stash_host='localhost', logger_name='soumil')
logger = instance.get()

count = 0
from time import sleep
while True:

    count = count + 1

    if count % 2 == 0:
        logger.error('Error Message Code Faield : '.format(count))
    else:
        logger.info('python-logstash: test logstash info message: '.format(count))

我们在和 Logstash 运行的同一个机器上运行上面的应用。我们使用如下的方法来运行:

python app.py

我们在 Logstash 的 terminal 中可以看到:

它表明 Logstash 运作正常。

我们再到 Kibana 中打入如下的命令:

GET _cat/indices

从上面的输出中,我们可以看到新生成的 logdb 索引。

我们可以对这个索引进行搜索:

我们可以看到日志被正常地解析并可以被搜索。

以上是关于Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch:Anaylyzer 在 Python 中的运用

Elasticsearch:Analyzer 在 Python 中的运用

Elasticsearch:运用 Python 来实现对搜索结果的分页

Elasticsearch:运用 Python 来实现对搜索结果的分页

Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询

Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询