python 用于导出kibana实体的Python脚本

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 用于导出kibana实体的Python脚本相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Script providing solution for backup Kibana entities to *.json files
"""

from __future__ import print_function

import argparse
import base64
import json
import os
import re
import shutil
import tarfile
import time
import urllib2 as urllib


class KibanaBackup(object):
    """
    Backups defined objects from Kibana
    """
    def __init__(self, **kwargs):
        self.__backup_dir = kwargs.get('backup_dir', '.')
        self.__verbose = kwargs.get('verbose', False)
        self.__headers_set = {
            'Content-Type': 'application/json',
            'Accept': 'application/json',
        }

    def __get_items_list(self, **kwargs):
        elastic_url = kwargs.get('elastic_url', 'http://lcoalhost:9200')
        elastic_user = kwargs.get('elastic_user')
        elastic_pass = kwargs.get('elastic_pass')

        elastic_headers = self.__headers_set
        if elastic_user and elastic_pass:
            elastic_headers.setdefault('Authorization', 'Basic {}'.format(
                base64.b64encode('{}:{}'.format(elastic_user, elastic_pass))))

        objects_for_backup = [
            'dashboard'
        ]

        items = dict()

        print('Getting available objects from elastic: {}'.format(elastic_url))
        request_object = urllib.Request('{url}/.kibana/_search?q=*'.format(url=elastic_url),
                                        headers=elastic_headers)
        response = urllib.urlopen(request_object)
        data = json.loads(response.read())

        for item in data['hits']['hits']:
            object_type = item.get('_id').split(':')[0]
            object_id = item.get('_id').split(':')[1]

            if object_type == 'url':
                if self.__verbose:
                    print('Processing data from url object')
                data = self.__get_data_from_url_entity(item)
                if data:
                    object_type = data.group('type')
                    object_id = data.group('id')
                    item.setdefault('_source', {})
                    item['_source'].setdefault(object_type, {})
                    item['_source'][object_type].setdefault('title', '')
                    item['_source'][object_type]['title'] = data.group('title')

            if object_type in objects_for_backup:
                items.setdefault(object_type, list())

                items[object_type].append({'id': object_id, 'name': item.get('_source', {}).get(
                    object_type, {}).get('title', 'undefined')})
            else:
                if self.__verbose:
                    print('Object "{}" is not in list of objects for backup'.format(object_type))

        return items

    def __get_items(self, items_list, **kwargs):
        kibana_url = kwargs.get('kibana_url', 'http://localhost:5601')
        kibana_user = kwargs.get('kibana_user')
        kibana_pass = kwargs.get('kibana_pass')

        kibana_headers = self.__headers_set
        if kibana_user and kibana_pass:
            kibana_headers.setdefault('Authorization',
                                      'Basic {}'.format(
                                          base64.b64encode('{}:{}'.format(
                                              kibana_user, kibana_pass))))

        for kibana_object, items in items_list.items():
            if self.__verbose:
                print('Backup {} from {}'.format(kibana_object, kibana_url))

            try:
                api_uri = self.__set_kibana_uri(kibana_object)
            except EmptyUriException as error:
                if self.__verbose:
                    print(error.message)
                continue

            for item in items:
                if self.__verbose:
                    print('Downloading {}: {} from kibana API: {}'.format(kibana_object,
                                                                          item.get('name'),
                                                                          kibana_url))
                request_object = urllib.Request('{url}{uri}'.format(url=kibana_url,
                                                                    uri='{}{}'.format(api_uri,
                                                                                      item.get('id')
                                                                                     )),
                                                headers=kibana_headers)
                response = urllib.urlopen(request_object)
                data = json.loads(response.read())
                item.setdefault('data', data)

        return items_list

    def backup_kibana_items(self, **kwargs):
        """
        Starts backup objects and saves to archive
        :param kwargs:
        """
        backup_path = '{}/{}'.format(self.__backup_dir.rstrip('/'), 'backup_{}'.format(
            time.strftime('%d_%m_%Y_%H_%M_%S')))

        file_ops = FileOperations(base_path=backup_path, verbose=self.__verbose)
        file_ops.save_items(self.__get_items(self.__get_items_list(
            elastic_url=kwargs.get('elastic_url'),
            elastic_user=kwargs.get('elastic_user'),
            elastic_pass=kwargs.get('elastic_pass')),
                                             kibana_url=kwargs.get('kibana_url'),
                                             kibana_user=kwargs.get('kibana_user'),
                                             kibana_pass=kwargs.get('kibana_pass')))

    @staticmethod
    def __set_kibana_uri(object_type):
        if object_type == 'dashboard':
            uri = '/api/kibana/dashboards/export?dashboard='
        else:
            raise EmptyUriException('{} has not defined kibana api uri'.format(object_type))

        return uri

    @staticmethod
    def __get_data_from_url_entity(item):
        parser = re.compile(
            r'/app/kibana#/(?P<type>\S+)/(?P<id>\S+)\?.*title:(?P<title>[^,]+),.*',
            re.IGNORECASE)
        groups = parser.search(item.get('_source', {}).get('url', {}).get('url'))

        return groups


class FileOperations(object):
    """
    Separated class for file operations on objects from Kibana
    """
    def __init__(self, **kwargs):
        self.__base_path = kwargs.get('base_path', './backup')
        self.__verbose = kwargs.get('verbose', False)

    def save_items(self, items_list):
        """
        Save items to file in directory structure and make tar archive
        :param items_list: list of objects to save in files
        """
        if self.__verbose:
            print('Saving objects to {}'.format(self.__base_path))
        for kibana_object, items in items_list.items():
            path = '{base_path}/{sub_path}'.format(base_path=self.__base_path.rstrip('/'),
                                                   sub_path=kibana_object)

            try:
                self.__create_path(path)
            except BaseException as error:
                raise FailedSaveException('Can\'t create dir, error: {}'.format(error.message))

            for item in items:
                if item.get('data'):
                    with open('{path}/{file}'.format(path=path,
                                                     file='{}.json'.format(
                                                         item.get('name', 'unnamed_{}'.format(
                                                             kibana_object)))), 'w') as outfile:
                        json.dump(item['data'], outfile)

        self.__compress_files(archive_name=self.__base_path.rstrip('/').split('/')[-1],
                              path=self.__base_path.rstrip('/'))

        shutil.rmtree(self.__base_path)

    @staticmethod
    def __create_path(path):
        if not os.path.exists(path):
            os.makedirs(path)

    @staticmethod
    def __compress_files(archive_name, path):
        parent_path = os.sep.join(path.split(os.sep)[:-1])
        archive_path = '{}/{}.tar.gz'.format(parent_path, archive_name)
        print('Creating {}'.format(archive_path))
        with tarfile.open(archive_path, mode='w:gz') as archive:
            archive.add(path, recursive=True)


class EmptyUriException(BaseException):
    """
    Exception fro empty Kibana URI definition
    """
    pass


class FailedSaveException(BaseException):
    """
    Exception for failure on saving object
    """
    pass


def run_backup():
    """
    Parse arguments and run backup procedure
    :return:
    """

    parser = argparse.ArgumentParser(description='Backup Kibana items to files')
    parser.add_argument('--elasticUrl', help='Url of elasticsearch handling .kibana index',
                        dest='elastic_url', type=str, default=None)
    parser.add_argument('--elasticPassword',
                        help='Auth password of elasticsearch handling .kibana index',
                        dest='elastic_password', type=str, default=None)
    parser.add_argument('--elasticUser', help='Auth user of elasticsearch handling .kibana index',
                        dest='elastic_user', type=str, default=None)
    parser.add_argument('--kibanaUrl', help='Url of Kibana', dest='kibana_url', type=str,
                        default=None)
    parser.add_argument('--kibanaPassword', help='Auth password of kibana', dest='kibana_password',
                        type=str, default=None)
    parser.add_argument('--kibanaUser', help='Auth user of kibana', dest='kibana_user',
                        type=str, default=None)
    parser.add_argument('--output_dir', help='Path to save dashboards (default: ./backup)',
                        default='./backup', type=str)
    parser.add_argument('--verbose', '-v', help='Add more verbosity to script', action='store_true')

    args = parser.parse_args()

    backup = KibanaBackup(backup_dir=args.output_dir, verbose=args.verbose)

    backup.backup_kibana_items(elastic_url=args.elastic_url,
                               elastic_user=args.elastic_user,
                               elastic_pass=args.elastic_password,
                               kibana_url=args.kibana_url,
                               kibana_user=args.kibana_user,
                               kibana_pass=args.kibana_password)


if __name__ == '__main__':
    run_backup()

以上是关于python 用于导出kibana实体的Python脚本的主要内容,如果未能解决你的问题,请参考以下文章

Nginx 结合Python Ldap认证用于Kibana权限登陆

python 快速和肮脏的查克诺里斯python脚本。用于我们的SVN post commit钩子。如果您没有请求库:yum install pytho

Nginx 结合Python Ldap认证用于Kibana权限登陆

使用 Kibana API 导出的仪表板无法在 Kibana UI 中手动导入

ES(elasticsearch) - kibana导出csv

同一实体的两个存储库,一个已导出,一个未导出