python分析nginx日志并推送到open-falcon

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python分析nginx日志并推送到open-falcon相关的知识,希望对你有一定的参考价值。

不解释,直接撸代码

#!/usr/bin/python
# --*-- coding: utf-8 --*--
# NGINX日志分析

import time
import datetime
import sys
import os
import os.path
import re
import json
import socket
import requests
import subprocess

class NginxLog(object):
	"""初始化数据"""
	def __init__(self, log_file, interface_list, seek_file):
		self.log_file = log_file
		self.interface_list = interface_list
		self.seek_file = seek_file

	def jsonFormat(self, python_data):
		"""格式化成json格式"""
		json_data = json.dumps(python_data, indent=2)
		return json_data

	def hostname(self):
		"""host_name: 主机名
		host_ip: 主机ip
		"""
		host_name = socket.getfqdn(socket.gethostname( ))
		host_ip = socket.gethostbyname(host_name)
		return host_name

	def writeSeek(self, seek):
		"""读过的文件游标写入临时文件"""
		with open(self.seek_file,'w') as f:
			f.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) + '\n')
			f.write(str(seek) + "\n")

	def LogRead(self):
		"""读出新生成的日志
		# 如果第一次运行,或是删除临时文件,从头运行,否则,从上次读取之后运行
		# 0代表从头开始,1代表当前位置,2代表文件最末尾位置
		"""
		if os.path.exists(self.seek_file):
			with open(self.seek_file) as f:
				seek_tmp = f.readlines()
			seek_old = int(seek_tmp[1].strip())
		else:
			seek_old = 0
		with open(self.log_file) as f:
			#记录当前最新文件游标
			f.seek(0,2)	#最新游标位置
			seek_now = f.tell()
			#读取上次读完之后的日志
			if seek_now >= seek_old:
				f.seek(seek_old,0)	#从文件开头为止偏移
				chunk = f.read(seek_now - seek_old)	
			#如果seek_now-seek_old小于0说明日志轮训
			else:
				f.seek(0,0)
				chunk = f.read(seek_now)
		# 将这次的游标写入临时文件
		self.writeSeek(seek_now)
		return chunk

	def LogStatistics(self):
		"""分析NGINX日志的正则表达示,如果日志格式更改,则需要相应作更改
		"""

		#log_examp = '127.0.0.1 - - [02/Mar/2018:11:01:09 +0800] "HEAD /aaa HTTP/1.1" 404 0 "-" "curl/7.29.0"'
		#输出结果
		#100.64.40.7 - - [05/Mar/2018:09:15:19 +0800] "GET /wpsad.php HTTP/1.0" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.98 Safari/537.36 LBBROWSER" "114.255.44.143" 0.369
		result_list = []
		time_ns =  datetime.datetime.now().microsecond
		time_stamp = int(str(time.time())[0:10])
		host_name = self.hostname()
		
		# url列表循环
		for interface_item in self.interface_list:
			interface_item_dict_count = {}
			interface_item_dict_avg_request_time = {}
			interface_item_dict_2xx = {}
			interface_item_dict_4xx = {}
			interface_item_dict_5xx = {}
			interface_item_dict_count['ns']=interface_item_dict_avg_request_time['ns']=interface_item_dict_2xx['ns']=interface_item_dict_4xx['ns']=interface_item_dict_5xx['ns']=time_ns
			interface_item_dict_count['clock']=interface_item_dict_avg_request_time['clock']=interface_item_dict_2xx['clock']=interface_item_dict_4xx['clock']=interface_item_dict_5xx['clock']=time_stamp
			interface_item_dict_count['host']=interface_item_dict_avg_request_time['host']=interface_item_dict_2xx['host']=interface_item_dict_4xx['host']=interface_item_dict_5xx['host']=host_name
			interface_item_dict_count['key'] = interface_item + '_count'
			interface_item_dict_count['value'] = 0
			interface_item_dict_avg_request_time['key'] = interface_item + '_avg_request_time'
			interface_item_dict_avg_request_time['value'] = 0
			interface_item_dict_2xx['key'] = interface_item + '_2xx'
			interface_item_dict_2xx['value'] = 0
			interface_item_dict_4xx['key'] = interface_item + '_4xx'
			interface_item_dict_4xx['value'] = 0
			interface_item_dict_5xx['key'] = interface_item + '_5xx'
			interface_item_dict_5xx['value'] = 0
			hit_url_count = 0

			##实时输出日志
			
			
			for line in self.LogRead().split('\n'):
				#print(line.split()[0])
				
				if line != None and len(line.split()) != 0:
					#匹配字段
					remote_addr = line.split()[0]
					#切割请求的url
					request_url = line.split()[6]
					status_code = line.split()[8]
					request_time = line.split()[-1]	#保留等会用
					# 匹配之后数据结构操作
					if interface_item == request_url:
						hit_url_count += 1
						interface_item_dict_count['value'] += 1
						#响应时间
						interface_item_dict_avg_request_time['value'] += float(request_time)
						if status_code.strip('\"').startswith('2'):
							interface_item_dict_2xx['value'] += 1
						if status_code.strip('\"').startswith('4'):
							interface_item_dict_4xx['value'] += 1
						if status_code.strip('\"').startswith('5'):
							interface_item_dict_5xx['value'] += 1
			# 求平均请求反应时间
			if interface_item_dict_avg_request_time['value'] != 0:
				interface_item_dict_avg_request_time['value'] = interface_item_dict_avg_request_time['value'] / hit_url_count
			
			# 结果加入列表
			result_list.append(interface_item_dict_count)
			result_list.append(interface_item_dict_avg_request_time)
			result_list.append(interface_item_dict_2xx)
			result_list.append(interface_item_dict_4xx)
			result_list.append(interface_item_dict_5xx)
		return result_list
		#return self.jsonFormat(result_list)


def pushFalcon(return_data):
	"""数据推到openfalcon"""
	all_data = []
	all_request_dic = {}
	wpsad_2xx_request_dic = {}
	wpsad_4xx_request_dic = {}
	wpsad_5xx_request_dic = {}
	wpsad_response_time_dic = {}
	for i in return_data:
		if 'wpsad.php_count' in i['key']:
			all_request_dic['value'] = i['value']
			all_request_dic['key'] = 'wpsad.php_count'
			all_request_dic['host_name'] = i['host']
			all_data.append(all_request_dic)
		if 'wpsad.php_2xx' in i['key']:
			wpsad_2xx_request_dic['value'] = i['value']
			wpsad_2xx_request_dic['key'] = 'wpsad.php_2xx_count'
			wpsad_2xx_request_dic['host_name'] = i['host']
			all_data.append(wpsad_2xx_request_dic)
		if 'wpsad.php_4xx' in i['key']:
			wpsad_4xx_request_dic['value'] = i['value']
			wpsad_4xx_request_dic['key'] = 'wpsad.php_4xx_count'
			wpsad_4xx_request_dic['host_name'] = i['host']
			all_data.append(wpsad_4xx_request_dic)
		if 'wpsad.php_5xx' in i['key']:
			wpsad_5xx_request_dic['value'] = i['value']
			wpsad_5xx_request_dic['key'] = 'wpsad.php_5xx_count'
			wpsad_5xx_request_dic['host_name'] = i['host']
			all_data.append(wpsad_5xx_request_dic)
		if 'wpsad.php_avg_request_time' in i['key']:
			wpsad_response_time_dic['key'] = 'wpsad_response_time'
			wpsad_response_time_dic['value'] = i['value']
			#all_data.append(wpsad_response_time_dic)
	# nginx请求状态数据
	ts = int(time.time())
	payload = []
	for i in all_data:
		temp_dic = {
			#"endpoint": i['host_name'],
			"endpoint": "vm172-31-32-13.ksc.com",
			"metric": i['key'],
			"timestamp": ts,
			"step": 60,
			"value": i['value'],
			"counterType": "GAUGE",
			"tags": "url="+i['key']
		}
		payload.append(temp_dic)
	#print(payload)

	# 响应时间数据
	response_time_dic = {
		"endpoint": "vm172-31-32-13.ksc.com",
		"metric": wpsad_response_time_dic['key'],
		"timestamp": ts,
		"step": 60,
		"value": wpsad_response_time_dic['value'],
		"counterType": "GAUGE",
		"tags": "",
	}
	payload.append(response_time_dic)
	# nginx并发请求数统计
	estab_data = {
		"endpoint": "vm172-31-32-13.ksc.com",
		"metric": "nginx_estab_num",
		"timestamp": ts,
		"step": 60,
		"value": 0,
		"counterType": "GAUGE",
		"tags": "",
	}
	time_wait = {
		"endpoint": "vm172-31-32-13.ksc.com",
                "metric": "nginx_timewait_num",
                "timestamp": ts,
                "step": 60,
                "value": 0,
                "counterType": "GAUGE",
                "tags": "",
	}
	# TIME_WAIT
	time_wait_cmd = "netstat -ant|grep -i '80'|grep 'TIME_WAIT'|wc -l"
	time_wait_p = subprocess.Popen(time_wait_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
	timewait_out = time_wait_p.stdout.read()
	timewait_err = time_wait_p.stderr.read()
	if not timewait_err:
		time_wait['value'] = int(timewait_out.strip())
		payload.append(time_wait)

	# ESTABLISHED
	estab_cmd = "netstat -ant|grep -i '80'|grep 'ESTABLISHED'|wc -l"
	estab_p = subprocess.Popen(estab_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
	estab_out = estab_p.stdout.read()
	estab_err = estab_p.stderr.read()
	if not estab_err:
		estab_data['value'] = int(estab_out.strip())
		payload.append(estab_data)
	
	# nginx进程占用的内存监控
	mem_dic = {
		"endpoint": "vm172-31-32-13.ksc.com",
		"metric": "nginx_mem",
		"timestamp":ts,
		"step": 60,
		"value": 0,
		"counterType": "GAUGE",
		"tags": "",
	}
	#mem_cmd = "top -b -n1|grep nginx|gawk '{if($6~/m$/) {sum+=$6*1024} else {sum+=$6} }; END {print int(sum/1024)}'"
	#mem_p = subprocess.Popen(time_wait_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
	#mem_out = mem_p.stdout.read()
	#print(mem_out)
	#mem_err = mem_p.stderr.read()
	#if not mem_err:
	#	mem_dic['value'] = int(mem_out.strip())
	#	print(mem_dic['value'])
	#

	nginxpid = subprocess.Popen(["pidof", "nginx"], stdout=subprocess.PIPE)
	nginxpid = nginxpid.stdout.read().split()
	memsum = 0
	for i in nginxpid:
		pidfile = os.path.join("/proc/", str(i), "status")
		with open(pidfile) as f:
			for mem in f:
				if mem.startswith("VmRSS"):
					pidmem = int(mem.split()[1])
					memsum += pidmem
	memsum = int(memsum)//1024
	#print("%d %s" %(memsum,"M"))
	mem_dic['value'] = memsum
	payload.append(mem_dic)
	
	# 推送到falcon-agent
	#print(payload)
	r = requests.post("http://127.0.0.1:1988/v1/push", data=json.dumps(payload))
		
	
def main():
	# 需要分析的url列表
	interface_list = ['/wpsad.php']

	# 日志文件位置
	log_file = "/data/logs/nginx/ads.access.log"
	# 临时文件位置
	seek_file = "/data/logs/nginx/ads_log_check_seek.tmp"
	nginx_log = NginxLog(log_file, interface_list, seek_file)
	return_data = nginx_log.LogStatistics()
	
	#print return_json_data
	pushFalcon(return_data)

if __name__ == '__main__':
	main()


以上是关于python分析nginx日志并推送到open-falcon的主要内容,如果未能解决你的问题,请参考以下文章

python 监控elasticsearch集群状态并推送到openfalcon

在 Python 中拆分 Counter 得到的输出并推送到 Excel

项目配置打包项目镜像并推送到镜像仓库

收集 MongoDB 增量数据并推送到 kafka?

如何使用 GitHub API v3 创建提交并推送到 repo?

如何使用 pandas 读取并推送到 SQL 数据库中的文件不断获取数据