第9课:备份mysql数据库重写父类unittest框架多线程

Posted 茄子子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第9课:备份mysql数据库重写父类unittest框架多线程相关的知识,希望对你有一定的参考价值。

1. 写代码备份mysql数据库:

 1)Linux下,备份mysql数据库,在shell下执行命令:mysqldump -uroot -p123456 -A >db_bak.sql即可

import os
import datetime


class BakDb(object):
    def __init__(self, ip, username, passwd, port=3306, path=\'/tmp/db_bak\'):
        self.ip = ip
        self.username = username
        self.passwd = passwd
        self.port = port
        self.path = path

    def check_path_exist(self):
        if not os.path.isdir(self.path):
            os.mkdir(self.path)

    def bak_db(self):
        # mysqldump -u%s -p%s -h%s -A > **.sql
        filename = str(datetime.date.today()) + \'.sql\'
        self.check_path_exist()
        abs_file = os.path.join(self.path, filename)  # 变为绝对路径
        command = \'\'\'
        mysqldump -u{username} -p{passwd} -P{port} -h{ip} -A > {filename}
        \'\'\'.format(username=self.username,
                   passwd=self.passwd,
                   port=self.port,
                   ip=self.ip,
                   filename=abs_file)
        print(command)
        os.system(command)
        print("数据库备份完成")


obj = BakDb(\'**.**.**.**\', \'user\', \'passwd\')
obj.bak_db()

2. 重写父类的方法:核心思想就是先调用父类的方法,然后加新的代码就ok了,目的是扩展父类的一些功能。

class Coon(object):
    # 基类
    def __init__(self, host, passwd, port):
        self.host = host
        self.passwd = passwd
        self.port = port
        print(self.port)
        print(self.host, self.passwd, self.port)


class ConnMysql(Coon):
    def __init__(self, host, passwd, port, username, db, charset=\'utf-8\'):
        Coon.__init__(self, host, passwd, port)  # 调用父类的构造方法,得自己手动找到父类
        # super(ConnMysql).__init__(self,host,passwd,port)  # super会自动找到父类,然后调用父类的方法
        self.username = username
        self.db = db
        self.charset = charset
        print(self.host, self.passwd, self.port, self.username, self.db, self.charset)


sObj = Coon(\'1\', \'1\', 3306)
obj = ConnMysql(\'1\', \'2\', 3306, \'q\', \'qxy\')

运行结果:
3306
1 1 3306
3306
1 2 3306
1 2 3306 q qxy utf-8

3. 单元测试-unittest框架:

import unittest                                                              
import htmlTestRunner                                                        
from BeautifulReport import BeautifulReport                                  
                                                                             
                                                                             
def calc(x, y):                                                              
    return x + y                                                             
                                                                             
                                                                             
class TestCalc(unittest.TestCase):                                           
    def setUp(self):                                                         
        print(\'我是setUp\')                                                     
                                                                             
    def test_pass_case(self):                                                
        \'\'\'这个是通过的测试用例\'\'\'                                                     
        res = calc(1, 2)                                                     
        self.assertEqual(3, res)                                             
                                                                             
    def test_fail_case(self):                                                
        \'\'\'这个是失败的测试用例\'\'\'                                                     
        res = calc(9, 8)                                                     
        self.assertEqual(98, res)                                            
                                                                             
    def test_a(self):                                                        
        \'\'\'这是个普通的测试用例\'\'\'                                                     
        pass                                                                 
                                                                             
    def test_haha(self):                                                     
        \'\'\'这是哈哈哈测试用例\'\'\'                                                      
                                                                             
    def tearDown(self):                                                      
        print(\'我是tearDown\')                                                  
                                                                             
    @classmethod                                                             
    def setUpClass(cls):                                                     
        # 所有的用例执行前运行一次                                                       
        print(\'我是setUpClass\')                                                
                                                                             
    @classmethod                                                             
    def tearDownClass(cls):                                                  
        # 所有的用例运行完成后运行一次                                                     
        print(\'我是tearDownClass\')                                             
                                                                             
                                                                             
if __name__ == \'__main__\':                                                   
    unittest.main()  # 会运行当前python文件中的所有测试用例                                 
                                                                             
    # suite = unittest.TestSuite()   # 定义一个测试套件                              
    # suite.addTest(TestCalc(\'test_pass_case\'))# addTest的参数是TestCase实例或TestSu
    # suite.addTest(TestCalc(\'test_fail_case\'))                              
    # suite.addTest(TestCalc(\'test_a\'))  # 单个添加测试用例                          
                                                                             
    # suite.addTests(unittest.makeSuite(TestCalc)) # addTests的参数是由测试用例或测试套件组成
                                                                             
    # f = open(\'report.html\', \'wb\')                                          
    # runner = HTMLTestRunner.HTMLTestRunner(stream=f,title=\'测试报告\',descriptio
    # runner.run(suite)                                                      
                                                                             
    # BeautifulReport包能够生成界面更好看的测试报告                                         
    # result = BeautifulReport(suite)                                        
    # result.report(filename=\'reportB.html\',description=\'测试报告\',log_path=\'.\') 

  2)还有一种写法,将所有的case写在一个目录下,然后写一个运行所有case的代码:

import unittest
from BeautifulReport import BeautifulReport
import xmlrunner   # pip inttall xmlrunner
suite = unittest.TestSuite()
# TestLoader是用来加载TestCase到TestSuite中的
all_case = unittest.defaultTestLoader.discover(\'cases\', \'test*.py\')  # 第一个参数是目录,第二个参数是以test开头的是用例文件
for case in all_case:
    suite.addTests(case)  # case的类型是<class \'unittest.suite.TestSuite\'>
print(suite)
# 列表生成式
# [ suite.addTests(case) for case in all_case ]

result = BeautifulReport(suite)
result.report(filename=\'report_all_case.html\', description=\'测试报告\', log_path=\'.\')

# runner = xmlrunner.XMLTestRunner(\'.\')  # 为了产生xml格式的报告给Jenkins用,在当前目录生成报告
# runner.run(suite)  # 运行用例
test_buy.py

import unittest


class TestBuy(unittest.TestCase):
    def test_a(self):
        self.assertEqual(1, 1)

    def test_b(self):
        self.assertEqual(1, 2)

4. 多线程:

 1)咱们打开的程序都是进程,进程中至少有个一个线程

    2)线程包含在进程里,线程是最小的执行单元,线程之间是相互独立的

    3)主线程起了n个子线程之后,继续执行至结束,子线程可能仍在执行,子线程干活时间未统计上,

      所以要告诉主线程要等子线程执行完成后再结束程序
import threading
import time
import requests


def sayHi(name):
    time.sleep(2)
    print(name)


def downHtml(url, name):
    content = requests.get(url).content
    f = open(name, \'wb\')
    f.write(content)
    f.close()


urls = [
    [\'nnzhp\', \'http://www.nnzhp.cn\'],
    [\'dsx\', \'http://www.imdsx.cn\'],
    [\'besttest\', \'http://www.besttest.cn\']
]
# 单线程运行
# start_time = time.time()
# for url in urls:
#     downHtml(url[1], url[0])
# end_time = time.time()
# print(end_time-start_time)

# 多线程运行
start_time = time.time()
threads = []
for url in urls:
    t = threading.Thread(target=downHtml, args=(url[1], url[0]))  # 启动一个线程
    t.start()  # 运行
    # 等待子线程干完活
    # t.join()  # 主线程起了一个子线程后,等待子线程运行结束;再循环下一次起一个新的线程
    threads.append(t)
for t in threads:  # 主线程一直循环等待3个子线程 直到它们都干完活
    t.join()  # 主线程等待子线程
end_time = time.time()
print(end_time - start_time)

# for i in range(10):
#     t = threading.Thread(target=sayHi, args=(\'小黑\',))  # 启动一个线程,这里小黑后面要加个逗号
#     t.start() # 运行

  4) 多进程模块

import multiprocessing
import time

def run():
    time.sleep(2)
    print("多进程")
    for i in range(5):
        p = multiprocessing.Process(target=run2)
        p.start()

def run2():
    print("多进程启动")


if __name__ == \'__main__\':
    for i in range(5):
        p = multiprocessing.Process(target=run)
        p.start()

  5) 守护进程

import threading
import time


def pz():
    time.sleep(2)
    print(\'跟班\')
    

threads = []
for i in range(50):
    t = threading.Thread(target=pz)
    t.setDaemon(True)  # 设置子线程为守护线程,守护线程:一旦主线程立刻结束,那么子线程立刻结束,不管子线程有没有运行完,
    t.start()
    threads.append(t)
# for t in threads:
#     t.join()  # 如果线程调用t.join(),守护线程就不起作用了
time.sleep(3)
print(\'done\')   # 不加t.join(), 先打印done,后打印50个跟班

  6) 线程锁

import threading
from threading import Lock

num = 0
lock = Lock()  # 申请一把锁


def run():
    global num
    lock.acquire()  # 加锁
    num += 1
    lock.release()  # 解锁


lis = []
for i in range(5):
    t = threading.Thread(target=run)
    t.start()
    lis.append(t)
for t in lis:
    t.join()
print(\'over\', num)
# 加锁是为了防止多线程时同时修改数据,可能会导致数据不正确。
# python3中不加锁也无所谓,

   7) cpu是几核的,就只能同时运行几个进程,python的多线程是利用不了多核cpu的,GIL 全局解释器锁

    在python上开启多个线程,由于GIL的存在,每个单独线程都会在竞争到GIL后才运行,这样就干预OS内部的进程(线程)调度,

    所以在多核CPU上:python的多线程实际是串行执行的,并不会同一时间多个线程分布在多个CPU上运行。     
8) IO密集型任务:使用io比较多,如大批量网络请求,大量的输入输出,可以使用多线程。
   cpu密集型任务:使用cpu比较多,如一些逻辑算法类的任务。

以上是关于第9课:备份mysql数据库重写父类unittest框架多线程的主要内容,如果未能解决你的问题,请参考以下文章

C++--第17课 - 继承与多态 - 上

第48课 同名覆盖引发的问题------子类中函数重写遇上赋值兼容

第49课 多态的概念和意义

第49课.多态的概念和意义

20175305张天钰 《java程序设计》第四周课下测试总结

第0课 - 数据结构引言