python多进程数据库储存问题?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python多进程数据库储存问题?相关的知识,希望对你有一定的参考价值。
如下面代码所示,我利用多进程map执行main方法,main里面就是爬取数据保存到mysql的代码,如果数据库查询查询不到相同字段就insert插入,现在问题是,代码执行完毕,数据库空空如也,我注释掉多进程,执行for循环的代码,数据库有数据了,但是id不是从1自增的,1000条数据,多进程每执行一次,id增加1000多,数据却没保存,清空数据表,再执行for代码,id就是1了,还有我在main方法里增加commit()也就正常了,前面我是进程结束执行一次commit()。
if __name__ == "__main__":
print('开始了!')
current = time.time()
total_page_count = get_json(1)["result"]["query"]["totlePageCount"]
# for index in range(1, total_page_count+1):
# mian(index)
pool = Pool()
index = ([x for x in range(1, total_page_count+1)])
pool.map(main, index)
pool.close()
pool.join()
cur.close()
conn.commit()
conn.close()
print('执行结束')
end = time.time()
print(f'执行时间一共为end - current秒!')
问题是,每个进程访问数据库,要有各自的cursor,要各自去commit才可以。 参考技术A 最关键的Pool()是什么你应该贴一下
Python-多进程
1、多进程:
由于Python的GIL,多线程未必是CPU密集型程序的最好选择。
多进程可以完全独立的进程环境中运行程序,可以充分的利用多处理器。
但是进程本身的隔离带来的数据不共享也是一个问题,而且线程比进程轻量级。
2、multiprocessing:
Process类遵循了Thread类的API
测试:开启三个进程,计算一个CPU密集型程序。
1 import multiprocessing 2 import datetime 3 4 # 次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右 5 def calc(i): 6 sum = 0 7 for _ in range(100000000): 8 sum += 1 9 print(i, sum) 10 11 if __name__ == ‘__main__‘: 12 start = datetime.datetime.now() 13 14 ps = [] 15 for i in range(3):# 启动三个进程。 16 p = multiprocessing.Process(target=calc, args=(i,), name=‘cac={}‘.format(i)) 17 ps.append(p) 18 p.start() # 分别启动三个进程。 19 for p in ps: 20 p.join() # 当前主进程 的主线程等待所有的子进程结束后,再退出。 21 22 delta = (datetime.datetime.now() - start).total_seconds() 23 print(delta) 24 print(‘end---‘)
1 1 100000000 2 0 100000000 3 2 100000000 4 10.221419 5 end---
测试:同时CPU 密集型 ,单线程,执行3次:
1 import multiprocessing 2 import datetime 3 import logging 4 5 6 logging.basicConfig(level=logging.INFO, format=‘%(thread)s %(message)s‘) 7 start = datetime.datetime.now() 8 def calc(i): 9 sum = 0 10 for _ in range(100000000): 11 sum += 1 12 print(i, sum) 13 14 calc(0) 15 calc(1) 16 calc(2) 17 18 delta = (datetime.datetime.now() - start).total_seconds() 19 logging.info(delta)
1 0 100000000 2 1 100000000 3 6088 23.520346 4 2 100000000
测试:同上CPU 密集型,多线程,执行;
1 import multiprocessing 2 import datetime 3 import logging 4 import threading 5 # 次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右 6 7 logging.basicConfig(level=logging.INFO, format=‘%(thread)s %(message)s‘) 8 start = datetime.datetime.now() 9 def calc(i): 10 sum = 0 11 for _ in range(100000000): 12 sum += 1 13 print(i, sum) 14 15 t1 = threading.Thread(target=calc, args=(1,)) 16 t2 = threading.Thread(target=calc, args=(2,)) 17 t3 = threading.Thread(target=calc, args=(3,)) 18 19 t1.start() 20 t2.start() 21 t3.start() 22 t1.join() 23 t2.join() 24 t3.join() 25 26 27 delta = (datetime.datetime.now() - start).total_seconds() 28 logging.info(delta)
2 100000000 3 100000000 9908 22.107265 1 100000000
总结:
1、可以看到:CPU密集型,多线程和单线程执行的时间是差不多的,也就是说,由于GIL(全局解释器锁,进程锁)的存在,导致,即便是多线程,也是串行执行
2、可以看到,多进程执行,时间要很短,而且充分利用CPU资源,每个进程调度到一个CPU上,但是并没有绑定。但是如果绑定的话,就分别在自己的绑定的CPU上运行,此外CPU绑定,是为了利用cpu缓存。
3、注意的一点,多进程代码一定要在 __name__ == ‘__main__‘ 中执行。
线程是没有如下属性或方法:
名称 | 说明 |
pid | 进程ID |
exitcode | 进程的退出状态 |
terminate() | 终止指定的进程 |
3、进程间同步:(不怎么用,不过注意,方法,类相同,但是来自不同的模块下,线程是线程的,进程是进程的)
Python在进程间同步提供了和线程同步一样的类,使用的方法一样,使用的效果也类似。
不过,进程间代价要高于线程间,而且 底层实现是不同的,只不过Python屏蔽了这些不同之处,让用户简单使用多进程。
multiprocessing 还提供共享内存,服务器进程来共享数据,还提供了用于 进程间通讯的Queue队列,Pipe管道
通信方式不同:
1、多进程就是启动多个解释器进程,进程间通信必须序列化,反序列化
2、数据的线程安全性问题
如果 每个进程中没有实现多线程,GIL 可以说没什么用了
注:
queue模块是给线程用的,但是进程的Queue是multiprocessing提供的,一般不用,是本机使用
进程和进程之间通信:使用第三方Queue,如:kfakfa,而且基本都是网络通信(进程间通讯)
多进程,就是多解释器进程
进程间通讯:一般都要序列话,反序列化。
进程间通讯的锁:分布式锁, zookeper
4、进程池举例:
multiprocessing.Pool 是进程池类
名称 | 说明 |
apply(self,func,args=(),kwds={}) | 阻塞执行,导致主进程执行其他子进程就像一个执行 |
apply_async(self,func, args=(),kwds={}, callback=None, error_callback=None) |
与apply方法用法一致,非阻塞异步执行,得到结果后执行回调 |
cose() | 关闭池,池不能再接受新的任务 |
terminate() | 结束工作进程,不再处理未处理的任务 |
join() | 主进程阻塞等待子进程的退出,join方法要在close或terminate之后使用 |
测试:进程池的使用 apply_async
1 import multiprocessing 2 import datetime 3 import logging 4 import threading 5 # 次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右 6 start = datetime.datetime.now() 7 8 def calc(i): 9 sum = 0 10 for _ in range(100000000): 11 sum += 1 12 print(i, sum) 13 14 if __name__ == ‘__main__‘: 15 16 17 ps = [] 18 19 pool = multiprocessing.Pool(3) 20 for i in range(3): 21 pool.apply_async(calc, args=(i,)) 22 pool.close() 23 pool.join() 24 25 delta = (datetime.datetime.now() - start).total_seconds() 26 print(delta) 27 print(‘end---‘)
2 100000000
10.15358
end---
总结:
可以看出,利用进程池得出的结果和上面的结果差不多的。
有个前提要注意:
首先测试的电脑是4核心cpu,处理三个进程不会出现不够用,如果处理的进程如果大于CPU 核心数,就会出现等待,
但是这样,利用进程池,可以有效的使用有限资源,剩余的资源可以做别的事,但是,像上面的情况,就会出现明显的资源抢占,资源不够用。
所以,一般推荐使用线程池。
测试:进程池的使用 apply,可以看到,一个一个执行完,没有多进程优势存在
1 import multiprocessing 2 import datetime 3 import logging 4 import threading 5 # 次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右 6 start = datetime.datetime.now() 7 8 def calc(i): 9 sum = 0 10 for _ in range(100000000): 11 sum += 1 12 print(i, sum) 13 14 if __name__ == ‘__main__‘: 15 16 17 ps = [] 18 19 pool = multiprocessing.Pool(3) 20 for i in range(3): 21 pool.apply(calc, args=(i,)) 22 pool.close() 23 pool.join() 24 25 delta = (datetime.datetime.now() - start).total_seconds() 26 print(delta) 27 print(‘end---‘)
0 100000000
1 100000000
2 100000000
22.069262
end---
测试:回调
1 import multiprocessing 2 import datetime 3 import logging 4 import threading 5 # 次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右 6 logging.basicConfig(level=logging.INFO, format=‘%(process)s %(processName)s %(thread)s %(message)s‘) 7 start = datetime.datetime.now() 8 9 def calc(i): 10 sum = 0 11 for _ in range(100000000): 12 sum += 1 13 return i 14 15 if __name__ == ‘__main__‘: 16 ps = [] 17 18 pool = multiprocessing.Pool(3) 19 for i in range(3): 20 pool.apply_async(calc, args=(i,), callback=lambda x: logging.info(‘x ={}‘.format(x))) 21 pool.close() 22 pool.join() 23 24 delta = (datetime.datetime.now() - start).total_seconds() 25 print(delta) 26 print(‘end---‘)
3848 MainProcess 7764 x =0 3848 MainProcess 7764 x =2 3848 MainProcess 7764 x =1 13.521773 end---
回调:自己去主动调用,这里的回调就是,每个任务结束后的返回值,作为回调的参数传入。
而使用多线程,线程的返回值 是不能被用到的。结束就结束了。
5、多进程,多线程的选择:
1、CPU密集型
CPython 中使用到了GIL ,多线程的时候锁相互竞争,且多核优势不能发挥。Python多进程效率更高。
2、IO密集型
适合是用多线程,可以减少多进程间IO 的序列化开销,且在IO等待的视乎,切换大其他线程继续执行,效率不错。
6、应用
请求/ 应答 模型:WEB应用中常见的处理模型
master 启动朵儿worker工作进程,一般 额CPU 数目相同,发挥多核优势。
worker工作进程中,往往需要操作网络IO 和 磁盘 IO ,启动多线程,提高并发处理能力worker处理用户的请求,往往需要等待数据,处理完请求还要通过网络IO 返回响应。
这就是Nginx 工作模式。
worker进程开启多线程,因为要进行IO ,所以多线程相对合适。
apply 是同步阻塞,不推荐使用
apply_async :是异步非阻塞
回调就是调用你的函数。调用空闲函数
自己的任务结束后,才会执行,calback必须有一个参数,接受前面函数的返回值
concurrent :
多进程和多线程,注意是cpu密集型还是IO 密集型
logging模块:
以上是关于python多进程数据库储存问题?的主要内容,如果未能解决你的问题,请参考以下文章