python中使用pyspark 读取和整理日志数据并将数据写入到es中去

Posted 阿布alone

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python中使用pyspark 读取和整理日志数据并将数据写入到es中去相关的知识,希望对你有一定的参考价值。

代码:

import re
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext
from elasticsearch import Elasticsearch
spark=SparkSession.builder.appName("lz").getOrCreate()
sc = SparkContext.getOrCreate()
es = Elasticsearch()
month_map = {\'Jan\': \'1\', \'Feb\': \'2\', \'Mar\':\'3\', \'Apr\':\'4\', \'May\':\'5\', \'Jun\':\'6\', \'Jul\':\'7\',
    \'Aug\':\'8\',  \'Sep\': \'9\', \'Oct\':\'10\', \'Nov\': \'11\', \'Dec\': \'12\'}

log_data = sc.textFile("/Desktop/data_doc/data_Log/sshlogin/03.txt") #使用spark读取本地日志文件


for b in log_data.toLocalIterator(): 
    #以迭代的方式来把一条条数据读取出来进行正则匹配,并最终将 dict作为body写入到es中去
    # e=\'Ambari:Mar  2 02:14:16 ambari sshd[16716]: Accepted password for root from 172.21.202.174 port 59886 ssh2\'#日志格式
    log_group=re.search(\'^(\\S+):(\\w{3})\\s+(\\d{1,2})\\s(\\d{2}:\\d{2}:\\d{2})\\s(\\S+)\\s(\\S+)\\[(\\d+)\\]:\\s(.+)\',b)
    if log_group:
        year=\'2019\'
        try:
            logtime = year+\'-\'+month_map[log_group.group(2)]+\'-\'+log_group.group(3)+\' \'+log_group.group(4) #将字段拼接成年月日的格式
            logtime = datetime.datetime.strptime(logtime,\'%Y-%m-%d %H:%M:%S\')
        except Exception as e:
           pass
        row = dict(_hostname=log_group.group(1), #将数据组成一个字典  k,v
                  syslog_timestamp=logtime,
                  hostname=log_group.group(5),
                  program=log_group.group(6),
                  pid=log_group.group(7),
                  msg = log_group.group(8))
        if re.match(\'^Accepted password for\',row[\'msg\']) or re.match(\'^Accepted publickey for\',row[\'msg\']) :

            msg_a=re.search(\'Accepted\\s\\w+\\sfor\\s(\\S+)\\sfrom\\s(\\d{2,3}\\.\\d{2,3}\\.\\d{2,3}\\.\\d{2,3})\\sport\\s(\\d+)\',row[\'msg\'])
            row[\'login_success\']=True
            row[\'login_success_msg\']={\'username\':msg_a.group(1),\'user_ip\':msg_a.group(2),\'user_port\':msg_a.group(3)}
        es.index(index=\'data_log02\',doc_type=\'test02\',body=row) #将数据写入到es中去
    else:
        break

 

 

转自:https://www.cnblogs.com/wangkun122/articles/10936938.html

以上是关于python中使用pyspark 读取和整理日志数据并将数据写入到es中去的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中读取文本文件时有没有办法控制分区数

基于PySpark的航天日志分析(SQL分析)

如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志

在Pyspark中使用时,具有静态文件依赖性的python包无法读取静态文件

Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python

我对镶木地板文件和 python 完全陌生,谁能告诉我如何在 pyspark 中读取带有标题的镶木地板文件