爬虫多进程
Posted wl443587
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了爬虫多进程相关的知识,希望对你有一定的参考价值。
multiprocessing
python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock
一.Process
创建进程的类:Process([group
[, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
1.创建函数并将其作为单个进程
import multiprocessing import time def test(i): print(i) if __name__ == "__main__": p = multiprocessing.Process(target = test, args = (3,)) p.start() print("p.pid:", p.pid) print("p.name:", p.name) print("p.is_alive:", p.is_alive()) 结果: p.pid: 6716 p.name: Process-1 p.is_alive: True 3
2.创建函数并将其作为多个进程
import multiprocessing import time def test_1(i): print(i) def test_2(interval): print(i) if __name__ == "__main__": p1 = multiprocessing.Process(target = test_1, args = (2,)) p2 = multiprocessing.Process(target = test_2, args = (3,)) p1.start() p2.start() 结果:2 3
3.将进程定义为类
import multiprocessing import time class TestProcess(multiprocessing.Process): def __init__(self, i): multiprocessing.Process.__init__(self) self.i = i def run(self): print(self.i) if __name__ == ‘__main__‘: p = TestProcess(3) p.start() ps:进程p调用start()时,自动调用run() 结果: 3
4.daemon属性
加上属性(p.daemon = True), 主进程结束,它们就随着结束了
import multiprocessing import time def test(s): print("开始:",time.ctime()); time.sleep(s) print("结束:",time.ctime()); if __name__ == "__main__": p = multiprocessing.Process(target = test, args = (3,)) p.daemon = True # 主进程结束,它们就随着结束了. p.start() # p.join() 加上此方法主进程完成也执行test方法 print(‘lol‘) 结果 开始: Wed Nov 7 19:55:35 2018 结束: Wed Nov 7 19:55:38 2018 lol
二. 进程间通信
进程之间的数据时不同享的,进程之间通过以下几种方式来通信
1. Event
Event用来实现进程间同步通信。
import multiprocessing import time def wait_for_event(e): print("wait_for_event: starting") e.wait() print("wairt_for_event: e.is_set()->" + str(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout:starting") e.wait(t) print("wait_for_event_timeout:e.is_set->" + str(e.is_set())) if __name__ == "__main__": e = multiprocessing.Event() w1 = multiprocessing.Process(name = "block",target = wait_for_event,args = (e,)) w2 = multiprocessing.Process(name = "non-block",target = wait_for_event_timeout,args = (e, 2)) w1.start() w2.start() time.sleep(3) e.set() print("main: event is set") 结果: wait_for_event: starting wait_for_event_timeout:starting wait_for_event_timeout:e.is_set->False main: event is set wairt_for_event: e.is_set()->True
2. Queue
使用方式和进程中的差不多
import multiprocessing def foo1(q): q.put([33,‘wl‘]) def foo2(q): print(‘foo2:‘,q.get()) if __name__ == ‘__main__‘: q = multiprocessing.Queue(1500) p1 = multiprocessing.Process(target=foo1, args=(q,)) p2 = multiprocessing.Process(target=foo2, args=(q,)) p1.start() p2.start() print(‘end‘) 结果 end foo2: [33, ‘wl‘]
3. Pipe
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
import multiprocessing def foo1(pipe): pipe.send([33,‘wl‘]) pipe.close() def foo2(pipe): print(‘foo2:‘,pipe.recv()) if __name__ == ‘__main__‘: p, c = multiprocessing.Pipe(100) p1 = multiprocessing.Process(target=foo1, args=(p,)) p2 = multiprocessing.Process(target=foo2, args=(c,)) p1.start() p2.start() print(‘end‘) 结果 end foo2: [33, ‘wl‘]
三. 进程锁
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
from multiprocessing import Process,Lock def run(L,i): L.acquire() print(‘hello world‘, i) L.release() if __name__ == ‘__main__‘: L = Lock() for i in range(10): Process(target=run,args=(L,i)).start() 结果(输出文件) hello world 7 hello world 3 hello world 6 hello world 5 hello world 0 hello world 2 hello world 4 hello world 9 hello world 8 hello world 1
2. Semaphore
Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。
import multiprocessing import time def worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release "); s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()
四.进程池
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
函数解释:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,并行
- apply(func[, args[, kwds]])是阻塞的, 串行
- close() 关闭pool,使其不在接受新的任务。
- terminate() 结束工作进程,不在处理未完成的任务。
- join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用
1.使用进程池(非阻塞)
from multiprocessing import Process,Pool import time def foo(i): print(‘foo:‘,i) def Bar(a): print(‘done‘,a) if ‘__name__‘ == ‘__main__‘: pool = Pool(5) # 进程池中允许放入5个线程 for i in range(10): pool.apply_async(func = foo,args=(i,)) print(‘end‘) pool.close() pool.join()# 主线程等待子进程执行完成,必须在close()或者terminate()之后
2.使用多个进程池
import multiprocessing import os, time, random def Lee(): print(" Run task Lee-%s" % (os.getpid())) # os.getpid()获取当前的进程的ID start = time.time() time.sleep(random.random() * 10) # random.random()随机生成0-1之间的小数 end = time.time() print(‘Task Lee, runs %0.2f seconds.‘ % (end - start)) def Marlon(): print(" Run task Marlon-%s" % (os.getpid())) start = time.time() time.sleep(random.random() * 40) end = time.time() print(‘Task Marlon runs %0.2f seconds.‘ % (end - start)) def Allen(): print(" Run task Allen-%s" % (os.getpid())) start = time.time() time.sleep(random.random() * 30) end = time.time() print(‘Task Allen runs %0.2f seconds.‘ % (end - start)) def Frank(): print(" Run task Frank-%s" % (os.getpid())) start = time.time() time.sleep(random.random() * 20) end = time.time() print(‘Task Frank runs %0.2f seconds.‘ % (end - start)) if __name__ == ‘__main__‘: function_list = [Lee, Marlon, Allen, Frank] print("parent process %s" % (os.getpid())) pool = multiprocessing.Pool(4) for func in function_list: pool.apply_async(func = func) # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中 print(‘Waiting for all subprocesses done...‘) pool.close() pool.join() # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束 print(‘All subprocesses done.‘)
五.进程与爬虫
1.多进程爬虫
from multiprocessing import Process,Queue,Pool,Manager import time,requests link_list = [] with open(‘url.txt‘,‘r‘) as f: for u in f.readlines(): url = u.split()[0].replace(‘ ‘,‘‘) link_list.append(url) class myProcess(Process): def __init__(self,name,q): Process.__init__(self) self.name = name self.userName = 0 self.q = q def run(self): print(‘开始:‘,self.name) while not self.q.empty(): try: self.craw(self.name,self.q) except Exception as e: break print(‘结束:‘,self.name) def writeImages(self, ThreadName, url): print("正在存储文件 %s ..." % ThreadName+str(self.userName)) path = r‘D:zhappian‘ + ‘\\‘ + ThreadName+str(self.userName) + ‘.png‘ file = open(path, ‘wb‘) images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,q): url = q.get(timeout = 2) try: self.writeImages(name, url) except Exception as e: print(q.qsize(),url,e) if __name__ == ‘__main__‘: work_queue = Queue(1500) # 填充队列 for url in link_list: work_queue.put(url) start_time = time.time() # 创建新进程 for i in range(5): t = myProcess(‘进程-‘ + str(i + 1), work_queue) t.daemon = True t.start() t.join() end_time = time.time() print(‘爬虫的运行时间为:‘,end_time - start_time)
2.Pool+Queue的爬虫
from multiprocessing import Process, Queue, Pool, Manager import time, requests link_list = [] with open(‘url.txt‘, ‘r‘) as f: for u in f.readlines(): url = u.split()[0].replace(‘ ‘, ‘‘) link_list.append(url) def writeImages(ThreadName, url, userName): print("正在存储文件 %s ..." % ThreadName + str(userName)) path = r‘D:zhappian‘ + ‘\\‘ + ThreadName + str(userName) + ‘.png‘ file = open(path, ‘wb‘) images = requests.get(url, timeout=20).content file.write(images) file.close() def craw(name, q): url = q.get(timeout=2) userName = 0 while not q.empty(): try: writeImages(name, url, userName) userName += 1 except Exception as e: print(q.qsize(), url, e) if __name__ == ‘__main__‘: m = Manager() work_queue = m.Queue(1500) pool = Pool(5) # 填充队列 for url in link_list: work_queue.put(url) start_time = time.time() # 创建新进程 for i in range(5): pool.apply_async(func=craw,args=(‘Process‘+str(i+1),work_queue)) print(‘开始。。。。‘) pool.close() pool.join() end_time = time.time() print(‘爬虫的运行时间为:‘, end_time - start_time)
以上是关于爬虫多进程的主要内容,如果未能解决你的问题,请参考以下文章