使用python并发导出mysql全量离线数据

Posted _雪辉_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用python并发导出mysql全量离线数据相关的知识,希望对你有一定的参考价值。

#!/usr/bin/python3
import argparse
import sys
import subprocess
import logging
import time
from multiprocessing import Pool
from connect_db_forbatch import connect_mysql

def db_log():
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    log_path = '/data/ansible/logs/'
    logfile = log_path  + 'catfulldump.log'
    fh = logging.FileHandler(logfile, mode='w')
    fh.setLevel(logging.DEBUG)
    formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    return logger

#获取全量id
def get_part(tablename):
    try:
        get_minpart_sql = "select report_id  from %s order by report_id limit 1;" %(tablename)
        get_maxpart_sql = "select report_id  from %s order by report_id desc limit 1;" %(tablename)
        min_report_id = connect_mysql('xxx', xxx,'cat', 'hourly_report_content').select_db(get_minpart_sql)
        max_report_id = connect_mysql('xxx', xxx,'cat', 'hourly_report_content').select_db(get_maxpart_sql)
        return min_report_id[0][0],max_report_id[0][0]
    except Exception as err:
        print(err)
#导出全量数据
def dump_data(tablename,batch,id):
    try:
        minid = id - batch
        maxid = id
        #导出数据
        dump_data_sql = "mysqldump -uxxx -p'xxx' -hxxx -Pxxx -B cat --tables %s --where=\\"report_id>=%s and report_id<%s\\"  --skip-add-locks --skip-lock-tables --no-create-db --no-create-info --max_allowed_packet=1024000000 --complete-insert --skip-extended-insert >/data/dump/catdump/%s_%s_%s_full.sql" %(tablename,minid,maxid,tablename,minid,maxid)
        (status1, details1) = subprocess.getstatusoutput(dump_data_sql)
        #处理文件
        sed_cmd = "sed -e /^$/d -e /^--/d -e /^\\\\\\//d  -i /data/dump/catdump/%s_%s_%s_full.sql" %(tablename,minid,maxid)
        (status2, details2) = subprocess.getstatusoutput(sed_cmd)
        #导入数据
        load_data_sql = "mysql -uxxx -p'xxx' -hxxx -Pxxx --max-allowed-packet=1024000000 cat < /data/dump/catdump/%s_%s_%s_full.sql" %(tablename,minid,maxid)
        (status3, details3) = subprocess.getstatusoutput(load_data_sql)
        #删除文件
        delete_file = "rm -f /data/dump/catdump/%s_%s_%s_full.sql" %(tablename,minid,maxid)
        (status4, details4) = subprocess.getstatusoutput(delete_file)
    except Exception as err:
        #异常记录id段
        print(err)
        logger.error('minid:%s,maxid:%s',minid,maxid,exc_info=True)
def get_var():
    try:
        parser = argparse.ArgumentParser(description='同步表信息')
        parser.add_argument('--tablename', type=str, required=True,help='表名')
        parser.add_argument('--batch', type=int, required=True,help='每批数量')
        args = parser.parse_args(sys.argv[1:])
        tablename = args.tablename
        batch = args.batch
        return tablename,batch
    except Exception as err:
        print(err)

if __name__ == "__main__":
    try:
        tablename,batch = get_var()
        logger = db_log()
        min_id,max_id = get_part(tablename)
        logger.info('minid:%s,maxid:%s',min_id,max_id)
        p = Pool(4)
        for id in range(max_id,min_id,batch):
            p.apply_async(dump_data,args=(tablename,batch,id,))
        p.close()
        p.join()
    except Exception as err:
        print(err)

以上是关于使用python并发导出mysql全量离线数据的主要内容,如果未能解决你的问题,请参考以下文章

网易考拉数据仓库构建实践

离线数据全量导入与增量导入方案

MySQL5.7.18 备份Mysqldump,mysqlpump,xtrabackup,innobackupex 全量,增量备份,数据导入导出

Kafka Connect JDBC Source MySQL 全量同步

mysql单表导入数据,全量备份导入单表

[Spring cloud 一步步实现广告系统] 14. 全量索引代码实现