ELK学习笔记:3- python api&pyspark读取es中filebeat收集的日志数据-2023-2-11
Posted Merlin雷
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK学习笔记:3- python api&pyspark读取es中filebeat收集的日志数据-2023-2-11相关的知识,希望对你有一定的参考价值。
ELK学习笔记:3- python api&filebeat收集的日志数据-2023-2-11
1- python API
- 课程链接:
https://www.bilibili.com/video/BV1hY411E7iA
1、安装
pip install elasticsearch==7.17.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
2、简单使用
连接
from elasticsearch import Elasticsearch
es = Elasticsearch("http://192.168.88.129:9200")
创建、删除索引
es.indices.create(index="py_index01", ignore=404)
es.indices.delete(index="py_index01", ignore=404)
插入数据
es.indices.create(index="py_index01", ignore=404)
# 第 1 条
body =
"name": "ll",
"age": 20,
"city": "xj",
"hobbies": "sing,reading"
es.index(index="py_index01", id=1, body=body)
# 第 2 条
body =
"name": "ff",
"age": 21,
"city": "xj",
"hobbies": "sing,running"
es.index(index="py_index01", id=2, body=body)
查询
match_all、term、terms
- match_all 返回全部
- term 用于精确匹配哪些值、比如数字、日期、布尔值、not_analyzed字符串(未经切词的文本数据类型)
- terms 类似、允许指定多个匹配条件、某字段指定多个值
# 只显示name hobbies
query =
"query":
"match_all":
filter_path=['hits.hits._source.name', # 字段1
'hits.hits._source.hobbies'] # 字段2
result = es.search(index="py_index01", body=query, filter_path=filter_path)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))
# age=21
query =
"query":
"term":
"age": 21
result = es.search(index="py_index01", body=query)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))
# age=20或者21
query =
"query":
"terms":
"age": [20, 21]
result = es.search(index="py_index01", body=query)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))
# name为"ll"
query =
"query":
"term":
"name": "ll"
result = es.search(index="py_index01", body=query)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))
==============输出================
['name': 'll', 'hobbies': 'sing,reading', 'name': 'ff', 'hobbies': 'sing,running']
['name': 'ff', 'age': 21, 'city': 'xj', 'hobbies': 'sing,running']
['name': 'll', 'age': 20, 'city': 'xj', 'hobbies': 'sing,reading', 'name': 'ff', 'age': 21, 'city': 'xj', 'hobbies': 'sing,running']
['name': 'll', 'age': 20, 'city': 'xj', 'hobbies': 'sing,reading']
range
用法 | 含义 |
---|---|
gt | 大于 |
gte | 大于等于 |
lt | 小于 |
lte | 小于等于 |
query =
"query":
"range":
"age":
"gt": 20
result = es.search(index="py_index01", body=query)
print(result)
2- pyspark读取ES中的日志数据
- jar包下载地址 https://www.elastic.co/cn/downloads/past-releases/elasticsearch-apache-hadoop-7-17-3
- 版本号对应你的ES版本号
- 下载后解压、将
elasticsearch-hadoop-7.17.3.jar
、放入spark
安装目录下的jars
目录中
代码
不写os.environ可能会报错:Java gateway process exited before sending its port number
import os
os.environ['SPARK_HOME']='/software/server/spark/spark-2.4.5-bin-hadoop2.7'
os.environ['PYSPARK_PYTHON']='/software/server/miniconda3/bin/python3.7'
os.environ['JAVA_HOME']='/software/server/java/jdk1.8.0_221'
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[*]").setAppName("ES_test") \\
.set("es.nodes.wan.only", "true") \\
.set("es.nodes", "192.168.88.129") \\
.set("es.port", "9200") \\
.set("es.read.field.as.array.include", "message") \\
.set("es.nodes.data.only", "false")
sc = SparkContext(conf=conf)
q ="""
"query":
"match_all":
"""
es_read_conf =
"es.nodes" : "192.168.88.129",
"es.port" : "9200",
"es.resource" : "test-2023.02.11/_doc",
"es.query" : q
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
# 查看一条数据
es_rdd.take(1)
# 将第一条数据的日志字段提取出来
es_rdd.map(lambda x: x[1]['originallog']).take(1)
# ['Feb 10 23:48:35 localhost journal: Runtime journal is using 8.0M (max allowed 390.1M, trying to leave 585.2M free of 3.8G available → current limit 390.1M).']
问题
'message’字段不显示
问题:7.17.3版本的ES、2.4.5版本的pyspark、'message’字段不显示
解决:用processors
重命名默认的日志信息存储字段为’originallog’(自定义)
processors:
- rename:
fields:
- from: "message"
to: "originallog"
ignore_missing: false
fail_on_error: true
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
output.elasticsearch:
hosts: ["http://192.168.88.129:9200"]
index: "test-%+yyyy.MM.dd"
# 设置索引模板名称
setup.template.name: "test"
# 设置索引模板的匹配模式
setup.template.pattern: "test-*"
# 关闭生命周期索引
setup.ilm.enabled: false
# 覆盖已有的模板
setup.template.overwrite: true
# 配置索引模板
setup.template.settings:
# 分片数量
index.number_of_shards: 3
# 副本数量 小于集群数量 该主节点本身是不存的
index.number_of_replicas: 0
以上是关于ELK学习笔记:3- python api&pyspark读取es中filebeat收集的日志数据-2023-2-11的主要内容,如果未能解决你的问题,请参考以下文章
ELK 学习笔记之 elasticsearch启动时Warning解决办法
ELK学习笔记:0- 单机ES部署-7.13.3-2023-2-7
TodoJava学习笔记 100==100 & Reflection API & Optional类详解 & DIPIoCDI & token/cookie/s