Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch
Posted Elastic 中国社区官方博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch相关的知识,希望对你有一定的参考价值。
在我之前的文章,我详细地介绍了如何通过 Filebeat 来收集日志并写入到 Elasticsearch。你可以阅读我之前的文章:
在今天的文章中,我将分享如何使用 Logstash 把日志文件发送到 Elasticsearch。使用 Logstash 的好处是它可以很方便地使用它丰富的过滤器对数据进行清洗以便更好地对数据进行分析。我们使用如下的架构:
在今天的展示中,我将使用最新的 Elastic Stack 8.4.3 来进行展示。
安装
如果你还没有安装好自己的 Elasticsearch,Kibana 及 Logstash,你可以按照如下的文章来进行安装:
首先,我们参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群” 来生成 truststore.p12 证书文件:
$ pwd
/Users/liuxg/test/elasticsearch-8.4.3/config/certs
$ ls
http.p12 http_ca.crt transport.p12
$ keytool -import -file http_ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
Certificate was added to keystore
$ ls
http.p12 http_ca.crt transport.p12 truststore.p12
在上面,我们生产的 truststore.p12 的密码为 password。
我们针对 Logstash 配置如下的配置文件:
logstash.conf
input
udp
port => 5959
codec => json
target => "[document]"
output
stdout
codec => rubydebug
elasticsearch
index => "logdb"
hosts => ["https://192.168.0.3:9200"]
user => "elastic"
password => "6bTlJp388KkgJKWi+hQr"
ssl_certificate_verification => true
truststore => "/Users/liuxg/test/elasticsearch-8.4.3/config/certs/truststore.p12"
truststore_password => "password"
在上面,我们需要根据自己的 Elasticsearch 账号及密码进行修改。另外你也需要根据自己的证书位置进行相应的调整。 上面的 hosts 是我的本地 Elasticsearch 集群的访问地址。你需要根据自己的进行配置。在上面,我们使用 udp input 来收集日志,并传入到 Elasticsearch。在本示例中,我们忽略了 filter 部分,以简化问题的描述。我们可以把这个 logstash.conf 置于 Logstash 的安装根目录中。
我们可以使用如下的命令来运行:
Python 日志应用
我们首先来安装一个叫做 python-logstash 的包:
pip install python-logstash
我们设计如下的 Python 应用来通过 Logstash 写入日志:
app.py
import logging
import logstash
import sys
class Logging(object):
def __init__(self, logger_name='python-logger',
log_stash_host='localhost',
log_stash_upd_port=5959
):
self.logger_name = logger_name
self.log_stash_host = log_stash_host
self.log_stash_upd_port = log_stash_upd_port
def get(self):
logging.basicConfig(
filename="logfile",
filemode="a",
format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
level=logging.INFO,
)
self.stderrLogger = logging.StreamHandler()
logging.getLogger().addHandler(self.stderrLogger)
self.logger = logging.getLogger(self.logger_name)
self.logger.addHandler(logstash.LogstashHandler(self.log_stash_host,
self.log_stash_upd_port,
version=1))
return self.logger
instance = Logging(log_stash_upd_port=5959, log_stash_host='localhost', logger_name='soumil')
logger = instance.get()
count = 0
from time import sleep
while True:
count = count + 1
if count % 2 == 0:
logger.error('Error Message Code Faield : '.format(count))
else:
logger.info('python-logstash: test logstash info message: '.format(count))
我们在和 Logstash 运行的同一个机器上运行上面的应用。我们使用如下的方法来运行:
python app.py
我们在 Logstash 的 terminal 中可以看到:
它表明 Logstash 运作正常。
我们再到 Kibana 中打入如下的命令:
GET _cat/indices
从上面的输出中,我们可以看到新生成的 logdb 索引。
我们可以对这个索引进行搜索:
我们可以看到日志被正常地解析并可以被搜索。
以上是关于Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章
Elasticsearch:Anaylyzer 在 Python 中的运用
Elasticsearch:Analyzer 在 Python 中的运用
Elasticsearch:运用 Python 来实现对搜索结果的分页
Elasticsearch:运用 Python 来实现对搜索结果的分页
Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询
Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询