python中使用pyspark 读取和整理日志数据并将数据写入到es中去
Posted 阿布alone
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python中使用pyspark 读取和整理日志数据并将数据写入到es中去相关的知识,希望对你有一定的参考价值。
代码:
import re import datetime from pyspark.sql import SparkSession from pyspark import SparkContext from elasticsearch import Elasticsearch spark=SparkSession.builder.appName("lz").getOrCreate() sc = SparkContext.getOrCreate() es = Elasticsearch() month_map = {\'Jan\': \'1\', \'Feb\': \'2\', \'Mar\':\'3\', \'Apr\':\'4\', \'May\':\'5\', \'Jun\':\'6\', \'Jul\':\'7\', \'Aug\':\'8\', \'Sep\': \'9\', \'Oct\':\'10\', \'Nov\': \'11\', \'Dec\': \'12\'} log_data = sc.textFile("/Desktop/data_doc/data_Log/sshlogin/03.txt") #使用spark读取本地日志文件 for b in log_data.toLocalIterator(): #以迭代的方式来把一条条数据读取出来进行正则匹配,并最终将 dict作为body写入到es中去 # e=\'Ambari:Mar 2 02:14:16 ambari sshd[16716]: Accepted password for root from 172.21.202.174 port 59886 ssh2\'#日志格式 log_group=re.search(\'^(\\S+):(\\w{3})\\s+(\\d{1,2})\\s(\\d{2}:\\d{2}:\\d{2})\\s(\\S+)\\s(\\S+)\\[(\\d+)\\]:\\s(.+)\',b) if log_group: year=\'2019\' try: logtime = year+\'-\'+month_map[log_group.group(2)]+\'-\'+log_group.group(3)+\' \'+log_group.group(4) #将字段拼接成年月日的格式 logtime = datetime.datetime.strptime(logtime,\'%Y-%m-%d %H:%M:%S\') except Exception as e: pass row = dict(_hostname=log_group.group(1), #将数据组成一个字典 k,v syslog_timestamp=logtime, hostname=log_group.group(5), program=log_group.group(6), pid=log_group.group(7), msg = log_group.group(8)) if re.match(\'^Accepted password for\',row[\'msg\']) or re.match(\'^Accepted publickey for\',row[\'msg\']) : msg_a=re.search(\'Accepted\\s\\w+\\sfor\\s(\\S+)\\sfrom\\s(\\d{2,3}\\.\\d{2,3}\\.\\d{2,3}\\.\\d{2,3})\\sport\\s(\\d+)\',row[\'msg\']) row[\'login_success\']=True row[\'login_success_msg\']={\'username\':msg_a.group(1),\'user_ip\':msg_a.group(2),\'user_port\':msg_a.group(3)} es.index(index=\'data_log02\',doc_type=\'test02\',body=row) #将数据写入到es中去 else: break
转自:https://www.cnblogs.com/wangkun122/articles/10936938.html
以上是关于python中使用pyspark 读取和整理日志数据并将数据写入到es中去的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志
在Pyspark中使用时,具有静态文件依赖性的python包无法读取静态文件