python 用于导出kibana实体的Python脚本
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 用于导出kibana实体的Python脚本相关的知识,希望对你有一定的参考价值。
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Script providing solution for backup Kibana entities to *.json files
"""
from __future__ import print_function
import argparse
import base64
import json
import os
import re
import shutil
import tarfile
import time
import urllib2 as urllib
class KibanaBackup(object):
"""
Backups defined objects from Kibana
"""
def __init__(self, **kwargs):
self.__backup_dir = kwargs.get('backup_dir', '.')
self.__verbose = kwargs.get('verbose', False)
self.__headers_set = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
def __get_items_list(self, **kwargs):
elastic_url = kwargs.get('elastic_url', 'http://lcoalhost:9200')
elastic_user = kwargs.get('elastic_user')
elastic_pass = kwargs.get('elastic_pass')
elastic_headers = self.__headers_set
if elastic_user and elastic_pass:
elastic_headers.setdefault('Authorization', 'Basic {}'.format(
base64.b64encode('{}:{}'.format(elastic_user, elastic_pass))))
objects_for_backup = [
'dashboard'
]
items = dict()
print('Getting available objects from elastic: {}'.format(elastic_url))
request_object = urllib.Request('{url}/.kibana/_search?q=*'.format(url=elastic_url),
headers=elastic_headers)
response = urllib.urlopen(request_object)
data = json.loads(response.read())
for item in data['hits']['hits']:
object_type = item.get('_id').split(':')[0]
object_id = item.get('_id').split(':')[1]
if object_type == 'url':
if self.__verbose:
print('Processing data from url object')
data = self.__get_data_from_url_entity(item)
if data:
object_type = data.group('type')
object_id = data.group('id')
item.setdefault('_source', {})
item['_source'].setdefault(object_type, {})
item['_source'][object_type].setdefault('title', '')
item['_source'][object_type]['title'] = data.group('title')
if object_type in objects_for_backup:
items.setdefault(object_type, list())
items[object_type].append({'id': object_id, 'name': item.get('_source', {}).get(
object_type, {}).get('title', 'undefined')})
else:
if self.__verbose:
print('Object "{}" is not in list of objects for backup'.format(object_type))
return items
def __get_items(self, items_list, **kwargs):
kibana_url = kwargs.get('kibana_url', 'http://localhost:5601')
kibana_user = kwargs.get('kibana_user')
kibana_pass = kwargs.get('kibana_pass')
kibana_headers = self.__headers_set
if kibana_user and kibana_pass:
kibana_headers.setdefault('Authorization',
'Basic {}'.format(
base64.b64encode('{}:{}'.format(
kibana_user, kibana_pass))))
for kibana_object, items in items_list.items():
if self.__verbose:
print('Backup {} from {}'.format(kibana_object, kibana_url))
try:
api_uri = self.__set_kibana_uri(kibana_object)
except EmptyUriException as error:
if self.__verbose:
print(error.message)
continue
for item in items:
if self.__verbose:
print('Downloading {}: {} from kibana API: {}'.format(kibana_object,
item.get('name'),
kibana_url))
request_object = urllib.Request('{url}{uri}'.format(url=kibana_url,
uri='{}{}'.format(api_uri,
item.get('id')
)),
headers=kibana_headers)
response = urllib.urlopen(request_object)
data = json.loads(response.read())
item.setdefault('data', data)
return items_list
def backup_kibana_items(self, **kwargs):
"""
Starts backup objects and saves to archive
:param kwargs:
"""
backup_path = '{}/{}'.format(self.__backup_dir.rstrip('/'), 'backup_{}'.format(
time.strftime('%d_%m_%Y_%H_%M_%S')))
file_ops = FileOperations(base_path=backup_path, verbose=self.__verbose)
file_ops.save_items(self.__get_items(self.__get_items_list(
elastic_url=kwargs.get('elastic_url'),
elastic_user=kwargs.get('elastic_user'),
elastic_pass=kwargs.get('elastic_pass')),
kibana_url=kwargs.get('kibana_url'),
kibana_user=kwargs.get('kibana_user'),
kibana_pass=kwargs.get('kibana_pass')))
@staticmethod
def __set_kibana_uri(object_type):
if object_type == 'dashboard':
uri = '/api/kibana/dashboards/export?dashboard='
else:
raise EmptyUriException('{} has not defined kibana api uri'.format(object_type))
return uri
@staticmethod
def __get_data_from_url_entity(item):
parser = re.compile(
r'/app/kibana#/(?P<type>\S+)/(?P<id>\S+)\?.*title:(?P<title>[^,]+),.*',
re.IGNORECASE)
groups = parser.search(item.get('_source', {}).get('url', {}).get('url'))
return groups
class FileOperations(object):
"""
Separated class for file operations on objects from Kibana
"""
def __init__(self, **kwargs):
self.__base_path = kwargs.get('base_path', './backup')
self.__verbose = kwargs.get('verbose', False)
def save_items(self, items_list):
"""
Save items to file in directory structure and make tar archive
:param items_list: list of objects to save in files
"""
if self.__verbose:
print('Saving objects to {}'.format(self.__base_path))
for kibana_object, items in items_list.items():
path = '{base_path}/{sub_path}'.format(base_path=self.__base_path.rstrip('/'),
sub_path=kibana_object)
try:
self.__create_path(path)
except BaseException as error:
raise FailedSaveException('Can\'t create dir, error: {}'.format(error.message))
for item in items:
if item.get('data'):
with open('{path}/{file}'.format(path=path,
file='{}.json'.format(
item.get('name', 'unnamed_{}'.format(
kibana_object)))), 'w') as outfile:
json.dump(item['data'], outfile)
self.__compress_files(archive_name=self.__base_path.rstrip('/').split('/')[-1],
path=self.__base_path.rstrip('/'))
shutil.rmtree(self.__base_path)
@staticmethod
def __create_path(path):
if not os.path.exists(path):
os.makedirs(path)
@staticmethod
def __compress_files(archive_name, path):
parent_path = os.sep.join(path.split(os.sep)[:-1])
archive_path = '{}/{}.tar.gz'.format(parent_path, archive_name)
print('Creating {}'.format(archive_path))
with tarfile.open(archive_path, mode='w:gz') as archive:
archive.add(path, recursive=True)
class EmptyUriException(BaseException):
"""
Exception fro empty Kibana URI definition
"""
pass
class FailedSaveException(BaseException):
"""
Exception for failure on saving object
"""
pass
def run_backup():
"""
Parse arguments and run backup procedure
:return:
"""
parser = argparse.ArgumentParser(description='Backup Kibana items to files')
parser.add_argument('--elasticUrl', help='Url of elasticsearch handling .kibana index',
dest='elastic_url', type=str, default=None)
parser.add_argument('--elasticPassword',
help='Auth password of elasticsearch handling .kibana index',
dest='elastic_password', type=str, default=None)
parser.add_argument('--elasticUser', help='Auth user of elasticsearch handling .kibana index',
dest='elastic_user', type=str, default=None)
parser.add_argument('--kibanaUrl', help='Url of Kibana', dest='kibana_url', type=str,
default=None)
parser.add_argument('--kibanaPassword', help='Auth password of kibana', dest='kibana_password',
type=str, default=None)
parser.add_argument('--kibanaUser', help='Auth user of kibana', dest='kibana_user',
type=str, default=None)
parser.add_argument('--output_dir', help='Path to save dashboards (default: ./backup)',
default='./backup', type=str)
parser.add_argument('--verbose', '-v', help='Add more verbosity to script', action='store_true')
args = parser.parse_args()
backup = KibanaBackup(backup_dir=args.output_dir, verbose=args.verbose)
backup.backup_kibana_items(elastic_url=args.elastic_url,
elastic_user=args.elastic_user,
elastic_pass=args.elastic_password,
kibana_url=args.kibana_url,
kibana_user=args.kibana_user,
kibana_pass=args.kibana_password)
if __name__ == '__main__':
run_backup()
以上是关于python 用于导出kibana实体的Python脚本的主要内容,如果未能解决你的问题,请参考以下文章
Nginx 结合Python Ldap认证用于Kibana权限登陆
python 快速和肮脏的查克诺里斯python脚本。用于我们的SVN post commit钩子。如果您没有请求库:yum install pytho
Nginx 结合Python Ldap认证用于Kibana权限登陆
使用 Kibana API 导出的仪表板无法在 Kibana UI 中手动导入