python多进程数据库储存问题?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python多进程数据库储存问题?相关的知识,希望对你有一定的参考价值。

如下面代码所示,我利用多进程map执行main方法,main里面就是爬取数据保存到mysql的代码,如果数据库查询查询不到相同字段就insert插入,现在问题是,代码执行完毕,数据库空空如也,我注释掉多进程,执行for循环的代码,数据库有数据了,但是id不是从1自增的,1000条数据,多进程每执行一次,id增加1000多,数据却没保存,清空数据表,再执行for代码,id就是1了,还有我在main方法里增加commit()也就正常了,前面我是进程结束执行一次commit()。
if __name__ == "__main__":
print('开始了!')
current = time.time()
total_page_count = get_json(1)["result"]["query"]["totlePageCount"]
# for index in range(1, total_page_count+1):
# mian(index)
pool = Pool()
index = ([x for x in range(1, total_page_count+1)])
pool.map(main, index)
pool.close()
pool.join()
cur.close()
conn.commit()
conn.close()
print('执行结束')
end = time.time()
print(f'执行时间一共为end - current秒!')

粗看一下,估计pool.map里开启了多进程。
问题是,每个进程访问数据库,要有各自的cursor,要各自去commit才可以。
参考技术A 最关键的Pool()是什么你应该贴一下

Python-多进程

1、多进程:

  由于Python的GIL,多线程未必是CPU密集型程序的最好选择。

  多进程可以完全独立的进程环境中运行程序,可以充分的利用多处理器。

  但是进程本身的隔离带来的数据不共享也是一个问题,而且线程比进程轻量级。

2、multiprocessing:

  Process类遵循了Thread类的API 

  测试:开启三个进程,计算一个CPU密集型程序。

技术分享图片
 1 import multiprocessing
 2 import datetime
 3 
 4 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
 5 def calc(i):
 6     sum = 0
 7     for _ in range(100000000):
 8         sum += 1
 9     print(i, sum)
10 
11 if __name__ == __main__:
12     start = datetime.datetime.now()
13 
14     ps = []
15     for i in range(3):# 启动三个进程。
16         p = multiprocessing.Process(target=calc, args=(i,), name=cac={}.format(i))
17         ps.append(p)
18         p.start() # 分别启动三个进程。
19     for p in ps:
20         p.join() # 当前主进程 的主线程等待所有的子进程结束后,再退出。
21 
22     delta = (datetime.datetime.now() - start).total_seconds()
23     print(delta)
24     print(end---)
测试:多进程

 

1 1 100000000
2 0 100000000
3 2 100000000
4 10.221419
5 end---

 

  测试:同时CPU 密集型 ,单线程,执行3次:

技术分享图片
 1 import multiprocessing
 2 import datetime
 3 import logging
 4 
 5 
 6 logging.basicConfig(level=logging.INFO, format=%(thread)s %(message)s)
 7 start = datetime.datetime.now()
 8 def calc(i):
 9     sum = 0
10     for _ in range(100000000):
11         sum += 1
12     print(i, sum)
13 
14 calc(0)
15 calc(1)
16 calc(2)
17 
18 delta = (datetime.datetime.now() - start).total_seconds()
19 logging.info(delta)
测试:单线程执行三次
1 0 100000000
2 1 100000000
3 6088 23.520346
4 2 100000000

  测试:同上CPU 密集型,多线程,执行;

技术分享图片
 1 import multiprocessing
 2 import datetime
 3 import logging
 4 import threading
 5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
 6 
 7 logging.basicConfig(level=logging.INFO, format=%(thread)s %(message)s)
 8 start = datetime.datetime.now()
 9 def calc(i):
10     sum = 0
11     for _ in range(100000000):
12         sum += 1
13     print(i, sum)
14 
15 t1 = threading.Thread(target=calc, args=(1,))
16 t2 = threading.Thread(target=calc, args=(2,))
17 t3 = threading.Thread(target=calc, args=(3,))
18 
19 t1.start()
20 t2.start()
21 t3.start()
22 t1.join()
23 t2.join()
24 t3.join()
25 
26 
27 delta = (datetime.datetime.now() - start).total_seconds()
28 logging.info(delta)
测试:多线程开启三个子线程
2 100000000
3 100000000
9908 22.107265
1 100000000

 

  总结:

    1、可以看到:CPU密集型,多线程和单线程执行的时间是差不多的,也就是说,由于GIL(全局解释器锁,进程锁)的存在,导致,即便是多线程,也是串行执行

    2、可以看到,多进程执行,时间要很短,而且充分利用CPU资源,每个进程调度到一个CPU上,但是并没有绑定。但是如果绑定的话,就分别在自己的绑定的CPU上运行,此外CPU绑定,是为了利用cpu缓存。

    3、注意的一点,多进程代码一定要在 __name__ == ‘__main__‘ 中执行。

                                  线程是没有如下属性或方法:

名称 说明
pid 进程ID
exitcode 进程的退出状态
terminate() 终止指定的进程

3、进程间同步:(不怎么用,不过注意,方法,类相同,但是来自不同的模块下,线程是线程的,进程是进程的)

  Python在进程间同步提供了和线程同步一样的类,使用的方法一样,使用的效果也类似。

  不过,进程间代价要高于线程间,而且 底层实现是不同的,只不过Python屏蔽了这些不同之处,让用户简单使用多进程。

  multiprocessing 还提供共享内存,服务器进程来共享数据,还提供了用于 进程间通讯的Queue队列,Pipe管道

  通信方式不同:

    1、多进程就是启动多个解释器进程,进程间通信必须序列化,反序列化

    2、数据的线程安全性问题

      如果 每个进程中没有实现多线程,GIL 可以说没什么用了

  注:

    queue模块是给线程用的,但是进程的Queue是multiprocessing提供的,一般不用,是本机使用

    进程和进程之间通信:使用第三方Queue,如:kfakfa,而且基本都是网络通信(进程间通讯)

    多进程,就是多解释器进程

    进程间通讯:一般都要序列话,反序列化。

    进程间通讯的锁:分布式锁, zookeper

4、进程池举例:

  multiprocessing.Pool 是进程池类

 

 

名称 说明
apply(self,func,args=(),kwds={}) 阻塞执行,导致主进程执行其他子进程就像一个执行

apply_async(self,func, args=(),kwds={},

callback=None, error_callback=None)

与apply方法用法一致,非阻塞异步执行,得到结果后执行回调
cose() 关闭池,池不能再接受新的任务
terminate() 结束工作进程,不再处理未处理的任务
join() 主进程阻塞等待子进程的退出,join方法要在close或terminate之后使用

  测试:进程池的使用 apply_async

技术分享图片
 1 import multiprocessing
 2 import datetime
 3 import logging
 4 import threading
 5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
 6 start = datetime.datetime.now()
 7 
 8 def calc(i):
 9     sum = 0
10     for _ in range(100000000):
11         sum += 1
12     print(i, sum)
13 
14 if __name__ == __main__:
15 
16 
17     ps = []
18 
19     pool = multiprocessing.Pool(3)
20     for i in range(3):
21         pool.apply_async(calc, args=(i,))
22     pool.close()
23     pool.join()
24 
25     delta = (datetime.datetime.now() - start).total_seconds()
26     print(delta)
27     print(end---)
4核心CPU处理三个进程

 

2 100000000
10.15358
end---

  总结:

    可以看出,利用进程池得出的结果和上面的结果差不多的。

      有个前提要注意:

        首先测试的电脑是4核心cpu,处理三个进程不会出现不够用,如果处理的进程如果大于CPU 核心数,就会出现等待,

        但是这样,利用进程池,可以有效的使用有限资源,剩余的资源可以做别的事,但是,像上面的情况,就会出现明显的资源抢占,资源不够用。

        所以,一般推荐使用线程池。

  测试:进程池的使用 apply,可以看到,一个一个执行完,没有多进程优势存在

技术分享图片
 1 import multiprocessing
 2 import datetime
 3 import logging
 4 import threading
 5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
 6 start = datetime.datetime.now()
 7 
 8 def calc(i):
 9     sum = 0
10     for _ in range(100000000):
11         sum += 1
12     print(i, sum)
13 
14 if __name__ == __main__:
15 
16 
17     ps = []
18 
19     pool = multiprocessing.Pool(3)
20     for i in range(3):
21         pool.apply(calc, args=(i,))
22     pool.close()
23     pool.join()
24 
25     delta = (datetime.datetime.now() - start).total_seconds()
26     print(delta)
27     print(end---)
apply

 

0 100000000
1 100000000
2 100000000
22.069262
end---

 

  测试:回调

技术分享图片
 1 import multiprocessing
 2 import datetime
 3 import logging
 4 import threading
 5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
 6 logging.basicConfig(level=logging.INFO, format=%(process)s %(processName)s %(thread)s %(message)s)
 7 start = datetime.datetime.now()
 8 
 9 def calc(i):
10     sum = 0
11     for _ in range(100000000):
12         sum += 1
13     return i
14 
15 if __name__ == __main__:
16     ps = []
17 
18     pool = multiprocessing.Pool(3)
19     for i in range(3):
20         pool.apply_async(calc, args=(i,), callback=lambda x: logging.info(x ={}.format(x)))
21     pool.close()
22     pool.join()
23 
24     delta = (datetime.datetime.now() - start).total_seconds()
25     print(delta)
26     print(end---)
callback
3848 MainProcess 7764 x =0
3848 MainProcess 7764 x =2
3848 MainProcess 7764 x =1
13.521773
end---

 

    回调:自己去主动调用,这里的回调就是,每个任务结束后的返回值,作为回调的参数传入。

    而使用多线程,线程的返回值 是不能被用到的。结束就结束了。

5、多进程,多线程的选择:

  1、CPU密集型

  CPython 中使用到了GIL ,多线程的时候锁相互竞争,且多核优势不能发挥。Python多进程效率更高。

  2、IO密集型

  适合是用多线程,可以减少多进程间IO 的序列化开销,且在IO等待的视乎,切换大其他线程继续执行,效率不错。

6、应用

  请求/ 应答 模型:WEB应用中常见的处理模型

  master 启动朵儿worker工作进程,一般 额CPU 数目相同,发挥多核优势。

  worker工作进程中,往往需要操作网络IO 和 磁盘 IO ,启动多线程,提高并发处理能力worker处理用户的请求,往往需要等待数据,处理完请求还要通过网络IO 返回响应。

  这就是Nginx 工作模式。

  worker进程开启多线程,因为要进行IO ,所以多线程相对合适。

 

 

 

 

 

 

 

 

 

 

 

 

  
  



        apply 是同步阻塞,不推荐使用
        apply_async :是异步非阻塞

        回调就是调用你的函数。调用空闲函数


        自己的任务结束后,才会执行,calback必须有一个参数,接受前面函数的返回值


        concurrent :

        多进程和多线程,注意是cpu密集型还是IO 密集型



logging模块:
    

 

以上是关于python多进程数据库储存问题?的主要内容,如果未能解决你的问题,请参考以下文章

python多进程存数据不改变顺序

python多线程和多进程

python多线程

python 使用多进程实现并发编程/使用queue进行进程间数据交换

Python 多处理:如何启动相互依赖的进程?

python多处理子进程未正常退出