python riak-bucket-dump.py

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python riak-bucket-dump.py相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python

import sys
import urllib2
import urllib
import multiprocessing
import json
import logging
import argparse
import urlparse
import collections
import signal
import os
import email
import email.utils

ChildEnv = collections.namedtuple('ChildEnv', ['base_url', 'timeout', 'logger'])

def check_positive(value):
    ivalue = int(value)
    if ivalue < 0:
         raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
    return ivalue

def check_port(value):
    ivalue = int(value)
    if not (0 < ivalue < 65536):
         raise argparse.ArgumentTypeError("%s is not a valid port number" % value)
    return ivalue

quote = urllib.quote_plus

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("bucket", help="name of bucket to dump")
    parser.add_argument("-H", "--host", help="remote address. Default: localhost", metavar="HOST", default="localhost")
    parser.add_argument("-p", "--port", help="remote port. Default: 8098", type=check_port, default=8098)
    parser.add_argument("-b", "--bucket-type", help="type of bucket to dump. Default: no type", default="default")
    parser.add_argument("-t", "--key-timeout", help="timeout for key query. Default: 5", type=check_positive, default=5)
    parser.add_argument("-T", "--list-timeout", help="timeout for key listing. Default: 120", type=check_positive, default=120)
    parser.add_argument("-w", "--workers", help="count of parallel workers used to extract keys. Default: 20", type=check_positive, default=20)
    parser.add_argument("-B", "--batch-size", help="batch size. Default: 100", type=check_positive, default=100)
    parser.add_argument("-C", "--tasks-per-child", help="limit tasks per child to prevent memory leaks. Default: None", type=check_positive, default=None)
    parser.add_argument("-o", "--output-file", help="limit tasks per child to prevent memory leaks. Default: BUCKET.json")
    args = parser.parse_args()
    return args

def setup_logger():
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    log_handler = logging.StreamHandler(sys.stderr)
    log_handler.setFormatter(
        logging.Formatter(
            '%(asctime)s %(levelname)-8s %(processName)s: %(message)s',
            '%Y-%m-%d %H:%M:%S'))
    logger.addHandler(log_handler)
    return logger

child_env = None

def init_child(env):
    global child_env
    child_env = env

def handle_result(res, ct):
    if ct == 'application/json':
        out = json.dumps(json.loads(res))
    elif ct == 'text/plain':
        out = json.dumps(res)
    else:
        raise ValueError("Unknown content-type in response.")
    return out

def get_key(key, tries = 10, attempt = 0):
    if attempt >= 10:
        return

    baseurl, timeout, logger = child_env
    url = baseurl + quote(key)

    try:
        resp = urllib2.urlopen(url, timeout=timeout)
        res = resp.read()
        ct = resp.info().getheader('Content-Type')
        res = handle_result(res, ct)

    except urllib2.HTTPError as e:
        if e.code == 300:
            try:
                req = urllib2.Request(url, headers={"Accept": "multipart/mixed"})
                resp = urllib2.urlopen(req, timeout=timeout)
                assert False, "Exception expected instead of OK response"
            except urllib2.HTTPError as e:
                if e.code == 300:
                    msgstr = "Content-Type: " + e.info().getheader('Content-Type') + "\r\n\r\n"
                    msgstr += e.read()
                    msg = email.message_from_string(msgstr)
                    last = max(msg.get_payload(),
                        key = lambda m: email.utils.parsedate(m['last-modified']))

                    try:
                        res = handle_result(last.get_payload(), last['content-type'])
                    except Exception as e:
                        logger.warn("Unable to retrieve subkeys of key %s: %s", repr(key), str(e))

                elif e.code == 406:
                    return get_key(key, tries, attempt + 1)
                else:
                    logger.warn("Unable to retrieve subkeys of key %s: %s", repr(key), str(e))
                    return

        else:
            logger.warn("Unable to retrieve key %s: %s", repr(key), str(e))
            return

    except Exception as e:
        logger.warn("Unable to retrieve key %s: %s", repr(key), str(e))
    return json.dumps(key) + ":  " + res

def sig_handler(signal, frame):
    sys.exit(0)

def main():
    signal.signal(signal.SIGINT, sig_handler)
    args = parse_args()
    out_filename = args.output_file if args.output_file else (args.bucket + ".json")
    out = open(out_filename, "w", 1)
    logger = setup_logger()

    keys_url = urlparse.urlunparse(
        ("http",
        "%s:%d" % (args.host, args.port),
        ("/types/" + quote(args.bucket_type) if args.bucket_type != "default" else "") + "/buckets/" + quote(args.bucket) + "/keys",
        "",
        "keys=true",
        ""))

    keys = json.load(urllib2.urlopen(keys_url, timeout = args.list_timeout))["keys"]
    keys_count = len(keys)
    logger.info("Loaded key list from bucket %s: items count = %d",
        (args.bucket_type, args.bucket), keys_count)

    bucket_base_url = urlparse.urlunparse(
        ("http",
        "%s:%d" % (args.host, args.port),
        ("/types/" + quote(args.bucket_type) if args.bucket_type != "default" else "") + "/buckets/" + quote(args.bucket) + "/keys/",
        "",
        "",
        ""))

    env = ChildEnv(bucket_base_url, args.key_timeout, logger)
    p = multiprocessing.Pool(args.workers, init_child, (env,), args.tasks_per_child)

    logger.info("Starting %d workers...", args.workers)
    out.write("{\n")
    last_res = None
    counter = 0
    for res in p.imap_unordered(get_key, keys, args.batch_size):
        if res is not None:
            if last_res is not None:
                out.write(last_res + ",\n")
            last_res = res
        counter += 1
        if counter % 10000 == 0:
            logger.info(
                "Processed %d records (%.2f%% completed).",
                counter,
                float(counter) / keys_count * 100)

    if last_res is not None:
        out.write(last_res + "\n")
    out.write("}\n")
    logger.info("Finished.")

if __name__ == '__main__':
    main()

以上是关于python riak-bucket-dump.py的主要内容,如果未能解决你的问题,请参考以下文章

Python代写,Python作业代写,代写Python,代做Python

Python开发

Python,python,python

Python 介绍

Python学习之认识python

python初识