ELK学习笔记:3- python api&pyspark读取es中filebeat收集的日志数据-2023-2-11

Posted Merlin雷

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK学习笔记:3- python api&pyspark读取es中filebeat收集的日志数据-2023-2-11相关的知识,希望对你有一定的参考价值。

ELK学习笔记:3- python api&filebeat收集的日志数据-2023-2-11

1- python API

  • 课程链接:

https://www.bilibili.com/video/BV1hY411E7iA

1、安装

pip install elasticsearch==7.17.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

2、简单使用

连接

from elasticsearch import Elasticsearch
es = Elasticsearch("http://192.168.88.129:9200")

创建、删除索引

es.indices.create(index="py_index01", ignore=404)
es.indices.delete(index="py_index01", ignore=404)

插入数据

es.indices.create(index="py_index01", ignore=404)
# 第 1 条
body = 
    "name": "ll",
    "age": 20,
    "city": "xj",
    "hobbies": "sing,reading"

es.index(index="py_index01", id=1, body=body)

# 第 2 条
body = 
    "name": "ff",
    "age": 21,
    "city": "xj",
    "hobbies": "sing,running"

es.index(index="py_index01", id=2, body=body)

查询

match_all、term、terms

  1. match_all 返回全部
  2. term 用于精确匹配哪些值、比如数字、日期、布尔值、not_analyzed字符串(未经切词的文本数据类型)
  3. terms 类似、允许指定多个匹配条件、某字段指定多个值
# 只显示name hobbies
query = 
    "query": 
        "match_all": 
    

filter_path=['hits.hits._source.name',  # 字段1
             'hits.hits._source.hobbies']  # 字段2

result = es.search(index="py_index01", body=query, filter_path=filter_path)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))

# age=21
query = 
    "query": 
        "term": 
            "age": 21
        
    

result = es.search(index="py_index01", body=query)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))

# age=20或者21
query = 
    "query": 
        "terms": 
            "age": [20, 21]
        
    

result = es.search(index="py_index01", body=query)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))

# name为"ll"
query = 
    "query": 
        "term": 
            "name": "ll"
        
    

result = es.search(index="py_index01", body=query)
print(list(map(lambda x: x['_source'],result['hits']['hits'])))

==============输出================
['name': 'll', 'hobbies': 'sing,reading', 'name': 'ff', 'hobbies': 'sing,running']
['name': 'ff', 'age': 21, 'city': 'xj', 'hobbies': 'sing,running']
['name': 'll', 'age': 20, 'city': 'xj', 'hobbies': 'sing,reading', 'name': 'ff', 'age': 21, 'city': 'xj', 'hobbies': 'sing,running']
['name': 'll', 'age': 20, 'city': 'xj', 'hobbies': 'sing,reading']

range

用法含义
gt大于
gte大于等于
lt小于
lte小于等于
query = 
    "query": 
        "range": 
            "age": 
            	"gt": 20
            
        
    

result = es.search(index="py_index01", body=query)
print(result)

2- pyspark读取ES中的日志数据

  • jar包下载地址 https://www.elastic.co/cn/downloads/past-releases/elasticsearch-apache-hadoop-7-17-3
  • 版本号对应你的ES版本号
  • 下载后解压、将elasticsearch-hadoop-7.17.3.jar、放入spark安装目录下的jars目录中

代码

不写os.environ可能会报错:Java gateway process exited before sending its port number

import os
os.environ['SPARK_HOME']='/software/server/spark/spark-2.4.5-bin-hadoop2.7'
os.environ['PYSPARK_PYTHON']='/software/server/miniconda3/bin/python3.7'
os.environ['JAVA_HOME']='/software/server/java/jdk1.8.0_221'

from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[*]").setAppName("ES_test") \\
        .set("es.nodes.wan.only", "true") \\
        .set("es.nodes", "192.168.88.129") \\
        .set("es.port", "9200") \\
        .set("es.read.field.as.array.include", "message") \\
        .set("es.nodes.data.only", "false")
sc = SparkContext(conf=conf)

q ="""
    "query": 
        "match_all": 
    
"""

es_read_conf = 
    "es.nodes" : "192.168.88.129",
    "es.port" : "9200",
    "es.resource" : "test-2023.02.11/_doc",
    "es.query" : q


es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

# 查看一条数据
es_rdd.take(1)
# 将第一条数据的日志字段提取出来
es_rdd.map(lambda x: x[1]['originallog']).take(1)
# ['Feb 10 23:48:35 localhost journal: Runtime journal is using 8.0M (max allowed 390.1M, trying to leave 585.2M free of 3.8G available → current limit 390.1M).']

问题

'message’字段不显示

问题:7.17.3版本的ES、2.4.5版本的pyspark、'message’字段不显示

解决:用processors重命名默认的日志信息存储字段为’originallog’(自定义)

processors:
  - rename:
      fields:
        - from: "message"
          to: "originallog"
      ignore_missing: false
      fail_on_error: true
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
output.elasticsearch:
  hosts: ["http://192.168.88.129:9200"]
  index: "test-%+yyyy.MM.dd"

# 设置索引模板名称
setup.template.name: "test"
# 设置索引模板的匹配模式
setup.template.pattern: "test-*"
# 关闭生命周期索引
setup.ilm.enabled: false
# 覆盖已有的模板
setup.template.overwrite: true
# 配置索引模板
setup.template.settings:
  # 分片数量
  index.number_of_shards: 3
  # 副本数量 小于集群数量  该主节点本身是不存的
  index.number_of_replicas: 0

以上是关于ELK学习笔记:3- python api&pyspark读取es中filebeat收集的日志数据-2023-2-11的主要内容,如果未能解决你的问题,请参考以下文章

ELK 学习笔记之 elasticsearch启动时Warning解决办法

elk笔记9--跨集群搜索

ELK学习笔记:0- 单机ES部署-7.13.3-2023-2-7

ELK学习笔记---安装ELK 5.x版

Python学习笔记 set&&dict

TodoJava学习笔记 100==100 & Reflection API & Optional类详解 & DIPIoCDI & token/cookie/s