scrapy 爬取不全?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scrapy 爬取不全?相关的知识,希望对你有一定的参考价值。
class GaoxiaoSpider(scrapy.Spider):
name = 'gaoxiao'
# allowed_domains = ['www.xxx.com']
start_urls = ['https://www.xiaohua.com/duanzi/']
url = 'https://www.xiaohua.com/duanzi?page=%d'
page_num = 2
def xiang_xi(self,response):
# item = response.meta['item']
xiang = response.xpath('/html/body/div[1]/div[8]/div[2]/div[2]/div/div/p//text() | /html/body/div[1]/div[8]/div[2]/div[2]/div/div/p/text()').extract_first()
# xiang = ''.join(xiang)
# item['xiang'] = xiang
# yield item
print(xiang + '\n')
def parse(self, response):
div_list = response.xpath('/html/body/div/div[8]/div[2]/div[2]/div[position()<21]')
for div in div_list:
# item = XiaohuawangproItem()
name = div.xpath('./div[1]/div[1]/a/i/text()').extract_first()
# item['name'] = name
dizhi = 'https://www.xiaohua.com' + div.xpath('./p/a/@href').extract_first()
# print(dizhi)
yield scrapy.Request(url=dizhi,callback=self.xiang_xi)
# break
if self.page_num <= 2:
new_url = format(self.url%self.page_num)
self.page_num += 1
yield scrapy.Request(url=new_url,callback=self.parse)
print(xiang)总是少几个信息,其他都完整显示
试着加上headers请求头
Scrapy爬取照片
一、基础概念
二、multiprocessing模块
三、queue模块
四、multiprocessing.dummy模块
五、threading模块
------------------------------------------------------------------
一、基础概念
进程是资源分配的最小单位,线程是程序执行的最小单位(资源调度的最小单位)
进程有自己的独立地址空间,每启动一个进程,系统就会为它分配地址空间,建立数据表来维护代码段、堆栈段和数据段。线程是共享进程中的数据,使用相同的地址空间,因此CPU切换一个线程的花费远比进程要小很多,同时创建一个线程的开销也比进程要小很多。线程的引入减小了程序并发执行时的开销。
线程之间的通信更方便,同一进程下的线程共享全局变量、静态变量等数据,而进程之间的通信需要以通信的方式(IPC)进行。不过如何处理好同步与互斥是编写多线程程序的难点。
进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。一个程序至少有一个进程,一个进程至少有一个线程。多线程程序只要有一个线程死掉,整个进程也死掉了,而一个进程死掉并不会对另外一个进程造成影响,多进程程序更健壮。
正常来讲,多线程要比多进程效率更高,因为进程间的切换需要的资源和开销更大,但在python中执行计算密集型任务时多线程实际只能是单线程,而且由于线程之间切换的开销导致多线程往往比实际的单线程还要慢,所以在python中计算密集型任务通常使用多进程。IO密集型任务中,如读写文件,在网络间通信等,适用于多线程。
计算密集型任务的速度:多进程 >多线程> 单进程/线程
IO密集型任务速度: 多线程 > 多进程 > 单进程/线程。
线程是系统级别的它们由操作系统调度,而协程则是程序级别的由程序根据需要自己调度。在一个线程中会有很多函数,我们把这些函数称为子程序,在子程序执行过程中可以中断去执行别的子程序,而别的子程序也可以中断回来继续执行之前的子程序,这个过程就称为协程。协程是一中多任务实现方式,它不需要多个进程或线程就可以实现多任务。
multiprocessing是python的多进程管理包。
threading threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
greenlet、gevent(第三方模块)可以实现协程
并行与并发 并行:并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核CPU) 并发:并行是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A,交替使用,目的是提高效率。 同步(Sync)/异步(Async) 同步:一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。
简单来说,同步就是必须一件一件事做,等前一件做完了才能做下一件事。 异步:异步和同步是相对的,同步就是顺序执行,执行完一个再执行下一个,需要等待、协调运行。异步就是彼此独立,在等待某事件的过程中继续做自己的事,不需要等待这一事件完成后再工作。
线程就是实现异步的一个方式。 阻塞/非阻塞 阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的 区别 并发/并行:即能够开启多个任务,多个任务交替执行为并发,多个任务同时执行为并行 同步/异步:关注的是消息通知的机制,主动等候消息则为同步、被动听消息则为异步 阻塞/非阻塞:关注的是等候消息的过程中有没有干其他事。 例子 举个简单的例子来描述这四种情况,老张要做两件事,用水壶烧开水,看电视,两件事情即两个任务,两个函数。 同步阻塞:老张把水壶放到火上,就坐在那里等水开,开了之后我再去看电视。 同步非阻塞:老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。 异步阻塞:老张把响水壶放到火上,然后就坐在旁边等着听那个烧开的提示音 异步非阻塞:老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。
二、multiprocessing模块
python中的多线程无法利用多核优势,如果想要充分的使用CPU资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。
multiprocess中几乎包含了和进程有关的所有子模块。大致分为四个部分:创建进程部分、进程同步部分、进程池部分、进程之间数据共享。multiprocessing常用组件及功能:
管理进程模块:
- Process(用于创建进程模块)
- Pool(用于创建管理进程池)
- Queue(用于进程通信,资源共享)
- Value,Array(用于进程通信,资源共享)
- Pipe(用于管道通信)
- Manager(用于资源共享,Manager模块常与Pool模块一起使用)
同步子进程模块:
- Condition
- Event
- Lock
- RLock
- Semaphore
1、运行方式
windows系统下,想要启动一个子进程,必须加上*if __name__=="__main__":*,linux则不需要。
2、父进程中的全局变量能被子进程共享吗?
不行,因为每个进程享有独立的内存数据,如果想要共享资源,可以使用Manage类,或者Queue等模块。
3、子进程中还能再创建子进程吗?
可以,子进程可以再创建进程,线程中也可以创建进程。
1、multiprocess.Process模块
Process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
Process([group [, target [, name [, args [, kwargs]]]]]) 1 group参数未使用,值始终为None 2 target表示调用对象,即子进程要执行的任务 3 args为传给target函数的位置参数,是一个元组形式,必须有逗号,args=(1,2,\'egon\',) 4 kwargs表示调用对象的字典,kwargs={\'name\':\'egon\',\'age\':18}
5 name为子进程的名称
方法: 1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会
被释放,进而导致死锁 4 p.is_alive():如果p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性: 1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程。设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 2 p.name:进程的名称 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性(了解即可)
Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),因此用process()直接创建子进程会无限递归创建。所以必须在创建子进程的部分使用
if __name__ ==‘__main__’ 判断保护起来,import 的时候就不会递归运行了。
1)Process创建子进程
#单独创建子进程,方法一; import os from multiprocessing import Process def func1(name): print(\'hello\', name) print(f"我是子进程:{name},我的ID是:{os.getpid()},我的父进程id是:{os.getppid()}") def func2(): print(\'hello\') if __name__ == \'__main__\': p1 = Process(target=func1, args=(\'测试\',)) # 此处传参必须是元组数据类型 p1.start() print(f"我是父进程:{os.getpid()}") p2 = Process(target=func2) p2.start() # 执行结果 \'\'\' 我是父进程:52164 hello 我是子进程:测试,我的ID是:70088,我的父进程id是:52164 \'\'\' #单独创建子进程,方法二: # 通过继承Process类的形式开启进程的方式 import os from multiprocessing import Process class MyProcess(Process): #需要的传参必须写入到__init__方法里面且必须加上super().__init__();因为父类Process里面也有__init__方法。 def __init__(self, name): super().__init__() self.name = name def run(self): #固定名字run ! print(os.getpid()) print(\'%s 正在聊天\' % self.name) if __name__ == \'__main__\': p1 = MyProcess(\'xiaobai_1\') p2 = MyProcess(\'xiaohei_2\') p1.start() p2.start() #结果 \'\'\' 10688 xiaobai_1 正在聊天 72912 xiaohei_2 正在聊天 \'\'\' #多进程1 #默认情况下,创建好的子进程是随机启动的,子进程的执行顺序不是根据自动顺序决定的 import time from multiprocessing import Process def func(name): print("hello 进程 %d" % name ) time.sleep(1) if __name__ == \'__main__\': for i in range(10): p = Process(target=func, args=(i,)) p.start() #结果 \'\'\' hello 进程 0 hello 进程 5 hello 进程 3 hello 进程 8 hello 进程 1 hello 进程 7 hello 进程 6 hello 进程 2 hello 进程 9 hello 进程 4 \'\'\' #多进程2 #创建好子进程后将其放入链表中,按顺序启动 import time from multiprocessing import Process def func(name): print("hello 进程 %d" % name ) time.sleep(0.1) if __name__ == \'__main__\': p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p_lst.append(p) p.start() p.join() # 加上join方法后,父进程就会阻塞等待子进程结束而结束。 print("父进程执行中") #结果 \'\'\' hello 进程 0 hello 进程 1 hello 进程 2 hello 进程 3 hello 进程 4 hello 进程 5 hello 进程 6 hello 进程 7 hello 进程 8 hello 进程 9 父进程执行中 \'\'\'
2)进程之间的数据隔离问题
from multiprocessing import Process n = 100 #在windows系统中把全局变量定义在if __name__ == \'__main__\'之上就可以了 def work(): global n n = 0 print("子进程内:", n) if __name__ == \'__main__\': p = Process(target=work) p.start() print("主进程内:", n) #结果 \'\'\' 主进程内: 100 子进程内: 0 \'\'\'
3)socket聊天并发实例
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind((\'127.0.0.1\',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == \'__main__\': #windows下start进程一定要写到这下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start() 使用多进程实现socket聊天并发-server
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect((\'127.0.0.1\',8080)) while True: msg=input(\'>>: \').strip() if not msg:continue client.send(msg.encode(\'utf-8\')) msg=client.recv(1024) print(msg.decode(\'utf-8\')) 使用多进程实现socket聊天并发-client
2、multiprocessing.Pool模块(进程池)
Pool类描述了一个工作进程池,进程池内部维护一个进程序列,当使用时就去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
如果需要多个子进程时可以考虑使用进程池(pool)来管理。pool创建子进程的方法与Process不同,是通过p.apply_async(func,args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始。
Pool(processes: Optional[int] = ...,initializer: Optional[Callable[..., Any]] = ..., initargs: Iterable[Any] = ..., maxtasksperchild: Optional[int] = ...) processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。 initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。 maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只 要Pool存在工作进程就会一直存活。 方法: apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,apply_async()更适合并发执行。 apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :会返回一个结果对象。callback可以接收一个参数然后被调用,调用失败时则用error_callback替换callback。 close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。 terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。 join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。 map(func, iterable[, chunksize]) map_async(func, iterable[, chunksize[, callback[, error_callback]]]) imap(func, iterable[, chunksize]) imap_unordered(func, iterable[, chunksize]) starmap(func, iterable[, chunksize]) starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
#apply同步执行:阻塞式 from multiprocessing import Pool import time,os def f1(i): time.sleep(0.5) print("%s开始执行,进程号为%d"%(i,os.getpid())) return i + 100 if __name__ == "__main__": pool = Pool(2) #开启2个进程 for i in range(1,10): #会循环调用创建的2个进程来执行任务 pool.apply(func=f1,args=(i,)) pool.close() pool.join() \'\'\' 2开始执行,进程号为5832 4开始执行,进程号为5832 6开始执行,进程号为5832 8开始执行,进程号为5832 1开始执行,进程号为6848 3开始执行,进程号为6848 5开始执行,进程号为6848 7开始执行,进程号为6848 9开始执行,进程号为6848 \'\'\' #apply_async异步执行:非阻塞式 from multiprocessing import Pool import os,time,random def worker(msg): print("%s开始执行,进程号为%d"%(msg,os.getpid())) time.sleep(random.random()*2) # random.random()随机成0-1的浮点数 print(msg,"执行完毕") return \'worker\'+str(msg) def f2(arg): print(arg) if __name__ == \'__main__\': po = Pool(3) # 最大的进程数为3 for i in range(0,10): #每次循环将会用空闲出来的子进程去调用目标 po.apply_async(worker,(i,),callback=f2) print("----start----") po.close() # 关闭进程池,关闭后po不再接受新的请求 po.join() # 等待po中的所有子进程执行完成,必须放在close语句之后,如果没有添加join(),会导致有的代码没有运行就已经结束了 print("-----end-----") \'\'\' ----start---- worker2 worker1 worker0 worker4 worker6 worker3 worker5 worker8 worker9 worker7 2开始执行,进程号为3156 2 执行完毕 3开始执行,进程号为3156 3 执行完毕 8开始执行,进程号为3156 8 执行完毕 0开始执行,进程号为716 0 执行完毕 5开始执行,进程号为716 5 执行完毕 9开始执行,进程号为716 9 执行完毕 1开始执行,进程号为6896 1 执行完毕 4开始执行,进程号为6896 4 执行完毕 6开始执行,进程号为6896 6 执行完毕 7开始执行,进程号为6896 7 执行完毕 -----end----- \'\'\'
from multiprocessing.pool import Pool def hhh(i): return i * 2 def job1(z): return z[0]* z[1] if __name__ == \'__main__\': pool = Pool(2) hh = pool.map(hhh, [i for i in range(16)]) #map()会将第二个参数列表元素一个个的传入第一个参数的函数中 res = pool.map(job1, [(2, 3), (3, 4)]) print(hh) print(res) from multiprocessing.pool import Pool import time def add_test (i): time.sleep(1) print(i * i) if __name__ == "__main__": pool = Pool(3) pool.map_async(add_test, [i for i in range(16)]) pool.close() pool.join()
3、Lock
作用:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
具体场景:所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。
from multiprocessing import Process, Lock def l(lock, num): lock.acquire() print "Hello Num: %s" % (num) lock.release() if __name__ == \'__main__\': lock = Lock() #这个一定要定义为全局 for num in range(20): Process(target=l, args=(lock, num)).start() #这个类似多线程中的threading,但是进程太多了,控制不了。
4、Semaphore
信号量,是在进程同步过程中一个比较重要的角色。可以控制临界资源的数量,保证各个进程之间的互斥和同步。可以用来控制对共享资源的访问数量,例如池的最大连接数。
实例:演示一下进程之间利用Semaphore做到同步和互斥,以及控制临界资源数量
from multiprocessing import Process, Semaphore, Lock, Queue import time buffer = Queue(10) empty = Semaphore(2) full = Semaphore(0) lock = Lock() class Consumer(Process): def run(self): global buffer, empty, full, lock while True: full.acquire() lock.acquire() buffer.get() print(\'Consumer pop an element\') time.sleep(1) lock.release() empty.release() class Producer(Process): def run(self): global buffer, empty, full, lock while True: empty.acquire() lock.acquire() buffer.put(1) print(\'Producer append an element\') time.sleep(1) lock.release() full.release() if __name__ == \'__main__\': p = Producer() c = Consumer() p.daemon = c.daemon = True p.start() c.start() p.join() c.join() print \'Ended!\'
如上代码实现了注明的生产者和消费者问题,定义了两个进程类,一个是消费者,一个是生产者。
定义了一个共享队列,利用了Queue数据结构,然后定义了两个信号量,一个代表缓冲区空余数,一个表示缓冲区占用数。
生产者Producer使用empty.acquire()方法来占用一个缓冲区位置,然后缓冲区空闲区大小减小1,接下来进行加锁,对缓冲区进行操作。然后释放锁,然后让代表占用的缓冲区位置数量+1,消费者则相反。
运行结果如下:
Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element
可以发现两个进程在交替运行,生产者先放入缓冲区物品,然后消费者取出,不停地进行循环。
5、Pipe通信
Pipe常用来在两个进程间通信,两个进程分别位于管道的两端。管道是可以同时发送和接受消息的。Queue适用于绝大多数场景,为满足普遍性而不得不多方考虑,它因此显得“重”。Pipe更为轻巧,速度更快
multiprocessing.Pipe([duplex])
(con1, con2) = Pipe()
con1管道的一端,负责存储,也可以理解为发送信息
con2管道的另一端,负责读取,也可以理解为接受信息
from multiprocessing import Process, Pipe def send(pipe): pipe.send([\'spam\'] + [42, \'egg\']) # send 传输一个列表 pipe.close() if __name__ == \'__main__\': (con1, con2) = Pipe() # 创建两个 Pipe 实例 sender = Process(target=send, args=(con1, )) # 函数的参数,args 一定是实例化之后的 Pipe 变量,不能直接写 args=(Pip(),) sender.start() # Process 类启动进程 print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 从send收到消息 con2.close()
管道是可以同时发送和接受消息的:
from multiprocessing import Process, Pipe def talk(pipe): pipe.send(dict(name=\'Bob\', spam=42)) # 传输一个字典 reply = pipe.recv() # 接收传输的数据 print(\'talker got:\', reply) if __name__ == \'__main__\': (parentEnd, childEnd) = Pipe() # 创建两个 Pipe() 实例,也可以改成 conf1, conf2 child = Process(target=talk, args=(childEnd,)) # 创建一个 Process 进程,名称为 child child.start() # 启动进程 print(\'parent got:\', parentEnd.recv()) # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据 parentEnd.send({x * 2 for x in \'spam\'}) # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据 child.join() # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply print(\'parent exit\') \'\'\' parent got: {\'name\': \'Bob\', \'spam\': 42} talker got: {\'ss\', \'mm\', \'pp\', \'aa\'} parent exit \'\'\'
6、进程间通信/共享:multiprocessing.JoinableQueueh、multiprocessing.Queue和multiprocessing.Manager
与多线程不同,多进程之间不会共享全局变量,所以多进程通信需要借助“外力”。这些常用的外力有Queue,Pipe,Value/Array,Manager。多个进程时,通常使用消息传递来进行进程之间的通信,对于传递消息可以使用Pipe()(用于两个进程之间的连接)或队列Queue(允许多个生产者和消费者)。
1)、Queue
Queue.put()放数据,默认有block=True和timeout两个参数。当block=True时,写入是阻塞式的,阻塞时间由timeout确定。当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常
Queue.get():取出队列中目前排在最前面的数据(默认阻塞),Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.full() #判断队列是否满了
Queue.empty() #判断队列是否为空
Queue.qsize() 返回队列的大小 ,不过在 Mac OS 上没法运行。
from multiprocessing import Queue from multiprocessing import Process def new_put(q): q.put(\'你好\') def new_get(q): ret = q.get() print(ret) if __name__ == \'__main__\': q = Queue() p = Process(target=new_put, args=(q, )) p.start() print(p.name) g = Process(target=new_get, args=(q, )) g.start() print(g.name)
from multiprocessing import Process, Queue def producer(q): # 生产 for i in range(1, 6): q.put(i) # 添加一个任务 print("生产%s馒头" % i) def consumer(q): # 消费 while 1: sth = q.get() print("消费%s馒头" % sth) if __name__ == \'__main__\': q = Queue(4) p1 = Process(target=producer, args=(q, )) p2 = Process(target=producer, args=(q, )) p3 = Process(target=producer, args=(q, )) c1 = Process(target=consumer, args=(q, )) c2 = Process(target=consumer, args=(q, )) # 将消费者设置为守护进程,因为消费者里面是死循环 c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() print("stop")
2)、JoinableQueue
JoinableQueue比Queue多了task_done和join方法
q.task_done() 使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。也就是put取出了,计数-1。
q.join() 生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
from multiprocessing import JoinableQueue, Queue q = JoinableQueue()# 用法和Queue相似 q.put("ocean") # 队列放入一个任务,内存在一个计数机制,+1 print(q.get()) q.task_done() # 完成一次任务,计数机制-1 q.join() # 计数机制不为0的时候,阻塞等待计数器为0后通过
from multiprocessing import Process, JoinableQueue def producer(q): # 生产 for i in range(1, 6): q.put(i) # 添加一个任务 print("生产%s馒头" % i) def consumer(q): # 消费 while 1: sth = q.get() print("消费%s馒头" % sth) name__ == \'__main__\': # 继承了Queue,多了两个功能,join() task_done() q = JoinableQueue(4) p1 = Process(target=producer, args=(q, )) p2 = Process(target=producer, args=(q, )) p3 = Process(target=producer, args=(q, )) c1 = Process(target=consumer, args=(q, )) c2 = Process(target=consumer, args=(q, )) # 将消费者设置为守护进程,因为消费者里面是死循环 c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() print("stop")
3)Array,Value
无论是Value()还是Array(),第一个参数都是typecode或type。typecode表示类型码,在Python中已经预先设计好了,如”c“表示char类型,“i”表示singed int类型等等。这种方式不易
记忆,建议用type表达类型,这里需要借助ctypes模块。
# typecode int_typecode = Value("i", 512) float_typecode = Value("f", 1024.0) char_typecode = Value("c", b"a") # 第二个参数是byte型 # type import ctypes int_type = Value(ctypes.c_int, 512) float_type = Value(ctypes.c_float, 1024.0) char_type = Value(ctypes.c_char, b"a") # 第二个参数是byte型
import multiprocessing def func(num): num.value=10.78 #子进程改变数值的值,主进程跟着改变 if __name__=="__main__": num=multiprocessing.Value("d",10.0) # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value) print(num.value) #10.0 p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num.value) #10.78 import multiprocessing def func(num): num[2]=9999 #子进程改变数组,主进程跟着改变 if __name__=="__main__": num=multiprocessing.Array("i",[1,2,3,4,5]) #主进程与子进程共享这个数组 print(num[:]) #[1, 2, 3, 4, 5] p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num[:]) #[1, 2, 9999, 4, 5] from multiprocessing import Process,Array def agk(i,arr): arr[i]=i+100 if __name__=="__main__": target=Array("i",10) #target=Array(\'i\',rang(10)) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] print(target[:]) #[0, 0, 0, 0, 0, 0, 0, 0, 0, 0] for i in range(10): t=Process(target=agk,args=(i,target)) t.start() t.join() print(target[:]) #[100, 101, 102, 103, 104, 105, 106, 107, 108, 109] 主进程可以取到子进程运行的数据 from multiprocessing import Process, Value, Array import ctypes def producer(num, string): num.value = 1024 #int型 string[0] = b"z" # 只能一个一个的赋值,byte型 string[1] = b"t" string[2] = b"y" def consumer(num, string): print(num.value) print(b"".join(string)) if __name__ == "__main__": num = Value(ctypes.c_int, 512) string = Array(ctypes.c_char, 3) # 设置一个长度为3的数组,字符串需要通过Array实现,而不是Value。 proProcess = Process(target=producer, args=(num, string)) conProcess = Process(target=consumer, args=(num, string)) proProcess.start() conProcess.start()
4)Manager
Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装。Queue和管道Pipe都仅能在进程之间传递数据,不能修改数据,multiprocessing.Manager可以实现在进程之间同时修改一份数据。Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager支持的类型有:
dict= Manager().dict() # 字典对象
list=Manager().list() #列表
queue = Manager().Queue() # 队列
lock = Manager().Lock() # 普通锁
rlock = Manager().RLock() # 可冲入锁
cond = Manager().Condition() # 条件锁
semaphore = Manager().Semaphore() # 信号锁
event = Manager().Event() # 事件锁
namespace = Manager().Namespace() # 命名空间
from multiprocessing import Process,Manager def f(d,n): d["name"] = "zhangyanlin" d["age"] = 18 d["Job"] = "pythoner" n.reverse() if __name__ == "__main__": with Manager() as man: d = man.dict() n = man.list(range(10)) p = Process(target=f,args=(d,n)) p.start() p.join() print(d) print(n) #结果 \'\'\' {\'name\': \'zhangyanlin\', \'age\': 18, \'Job\': \'pythoner\'} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] \'\'\'
import time from multiprocessing import Process,Queue,Pool,Manager def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": #queue = Queue() queue = Manager().Queue() pool = Pool() #pool中的进程间通信需要使用Manager pool.apply_async(producer,args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close()
from multiprocessing import Process,Pool,Manager #通过Queue获取子进程的值,并将他们组成列表 def new_put(q): #放入队列 q.put(\'你好\') #j_list=[] def new_get(q): #从队列中取出 ret = q.get() #在此处加入列表是没用的,每个进程都单独把值加入了列表的拷贝中,主进程中j_list还是[] #j_list.append(ret) return ret j_list=[] def ru(): manager = Manager() q = manager.Queue() for i in range(3): p = Process(target=new_put, args=(q, )) p.start() p = Pool(3) for j in range(3): g = p.apply_async(func=new_get, args=(q, )) j_list.append(g.get())#通过get()获取进程函数的返回值并加入列表 if __name__ == \'__main__\': ru() print(j_list) \'\'\' [\'你好\', \'你好\', \'你好\'] \'\'\' #获取进程函数返回值 from multiprocessing import Pool def sayHi(num): print(\'hi\') return num def mycallback(x): print(x) list1.append(x) if __name__ == \'__main__\': pool = Pool(4) list1 = [] for i in range(4): pool.apply_async(sayHi, (i,), callback=mycallback) pool.close() pool.join() print(list1)#[0, 1, 2, 3] from multiprocessing import Pool def test(p): return p if __name__=="__main__": pool = Pool(processes=5) result=[] for i in range(10): pp=pool.apply_async(test, args=(i,))#维持执行的进程总数为10,当一个进程执行完后添加新进程 result.append(pp) pool.close() pool.join() result_r=[] for i in result: result_r.append(i.get()) print(result_r) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] import multiprocessing from multiprocessing import Manager def worker(procnum, returns): print(str(procnum) + \' represent!\') returns.append(procnum) return returns if __name__ == \'__main__\': manager = Manager() return_list = manager.list() #也可以使用dict jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i, return_list)) jobs.append(p) p.start() for proc in jobs: proc.join() print(return_list) \'\'\' 3 represent! 2 represent! 1 represent! 0 represent! 4 represent! [2, 3, 1, 0, 4] \'\'\' from multiprocessing import Process,Manager import os def func1(shareList,shareValue,shareDict,lock,i): with lock: shareValue.value+=1 shareDict[1]=\'1\' shareDict[2]=\'2\' shareList[i%5]=i+12 print("%s开始执行,进程号为%d"%(i,os.getpid())) if __name__ == \'__main__\': manager=Manager() list1=manager.list([1,2,3,4,5]) dict1=manager.dict() array1=manager.Array(\'i\',range(10)) value1=manager.Value(\'i\',1) lock=manager.Lock() proc=[] for i in range(10): #生成10个进程 m=Process(target=func1,args=(list1,value1,dict1,lock,i)) m.start() proc.append(m) for p in proc: p.join() print(list1) print(dict1) print(array1) print(value1) \'\'\' 6开始执行,进程号为11532 2开始执行,进程号为7604 4开始执行,进程号为11596 0开始执行,进程号为8392 1开始执行,进程号为16564 3开始执行,进程号为2904 5开始执行,进程号为14280 7开始执行,进程号为12152 8开始执行,进程号为13252 9开始执行,进程号为13120 [17, 13, 19, 20, 21] {1: \'1\', 2: \'2\'} array(\'i\', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) Value(\'i\', 11) \'\'\' from selenium import webdriver from multiprocessing.dummy import Process, Manager import time def worker(i, return_list): browser=webdriver.Chrome() return_list.append(browser) time.sleep(1) print(i) return_list[i].get(\'http://selenium-python.readthedocs.io\') if __name__ == \'__main__\': manager = Manager() return_list = manager.list() #也可以使用dict jobs = [] for i in range(3): p = Process(target=worker, args=(i, return_list)) jobs.append(p) p.start() for proc in jobs: proc.join() print(return_list)
7、进程同步
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
# 多进程抢占输出资源 import os import time import random from multiprocessing import Process def work(n): print(\'%s: %s is running\' % (n, os.getpid())) time.sleep(random.random()) print(\'%s: %s is done\' % (n, os.getpid())) if __name__ == \'__main__\': for i in range(3): p = Process(target=work, args=(i,)) p.start() # 执行结果 """ 0: 14316 is running 1: 9900 is running 2: 10056 is running 1: 9900 is done 2: 10056 is done 0: 14316 is done """
# 使用锁维护执行顺序 import os import time import random from multiprocessing import Process, Lock def work(lock, n): lock.acquire() print(\'%s: %s is running\' % (n, os.getpid())) time.sleep(random.random()) print(\'%s: %s is done\' % (n, os.getpid())) lock.release() if __name__ == \'__main__\': lock = Lock() for i in range(3): p = Process(target=work, args=(lock, i)) p.start() # 执行结果 """ 0: 15276 is running 0: 15276 is done 1: 6360 is running 1: 6360 is done 2: 14776 is running 2: 14776 is done """
上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了。加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行修改,速度是慢了,但牺牲了速度却保证了数据的安全性。
因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题,这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。
8、守护进程
主进程创建守护进程,守护进程会随着主进程的结束而结束。被置为守护进程的子进程加了join()(起到阻塞作用),那么主进程会等子进程都运行完。
守护进程是一种在系统后台执行的程序,它独立于控制终端并且执行一些周期任务或触发事件。假如你编写了一个python服务程序,并且在命令行下启动,而你的命令行会话又被终端所控制,python服务成了终端程序的一个子进程。因此如果你关闭了终端,这个命令行程序也会随之关闭。要使你的python服务不受终端影响而常驻系统,就需要将它变成守护进程。
import time from multiprocessing import Process def foo(): print(123) time.sleep(3) print("end123") def bar(): print(456) time.sleep(2) print("end456") if __name__ == \'__main__\': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True #守护进程 p1.start() # p1.join() p2.start() p2.join() #打印该行则主进程代码结束,则守护进程p1应该被终止.可能p1执行的打印信息任务会因为主进程打印(main----)被终止. print("-----main-----") #结果: #如果foo的用时小于bar,那么foo可以正常完成执行 """ 456 end456 -----main----- """
三、queue模块
queue(队列)是python的标准库,在python2中模块名为Queue,python3中改为了queue。
在python中,多个线程之间的数据是共享的,多个线程进行数据交换的时候不能够保证数据的安全性和一致性,所以当多个线程需要进行数据交换的时候,队列就出现了,队列可以完美解决线程间的数据交换,保证线程间数据的安全性和一致性(简单的来说就是多线程需要加锁,很可能会造成死锁,而queue自带锁。所以多线程结合queue会好的很多)。所以多线程下queue可以随意使用,不会出现写冲突。
multiprocessing中有一个Queue类(multiprocessing.Queue), queue中也有一个Queue类(queue.Queue)。queue.Queue是进程内非阻塞队列。队列是多个进程各自私有的。multiprocess.Queue是跨进程通信队列,队列是是各子进程共有,它支持多进程之间的交互,比如master-worker模式下,master进程写入,work进程消费的模式,支持进程之间的通信。
1、常见队列
""" queue.Queue(maxsize) 先进先出队列 queue.LifoQueue(maxsize) 后进先出队列 queue.PriorityQueue(maxsize) 优先级队列 queue.deque双线队列
maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。 """
#单向队列 ,先进先出 import queue q = queue.Queue() q.put(123) q.put(456) q.put(789) print(q.get()) # 123 #单向队列,后进先出 import queue q = queue.LifoQueue() q.put(123) q.put(456) q.put(789) print(q.get()) # 789 #优先级队列 ,参数是元组,值越小优先级越高 import queue q = queue.PriorityQueue() q.put((4,123)) q.put((3,456)) q.put((2,789)) q.put((1,111)) q.put((0,666)) print(q.get()) # (0,666) #双向队列 import queue q = queue.deque() q.append(123) q.append(456) q.appendleft(666) print(q.pop()) # 456 print(q.popleft()) # 666
2、Queue 常见方法
import queue q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0时,表示队列长度无限制。
q.join() # 等到队列为空的时候,在执行别的操作 q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠) q.full() # 当队列满的时候,返回True,否则返回False (不可靠),与 maxsize 大小对应 q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置, #为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后, #如果队列无法给出放入item的位置,则引发 queue.Full 异常 q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞, #若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False)
3、生产者与消费者
import threading import time import queue def producer(i): q.put(i) def consumer(): print(q.get()) time.sleep(1) q = queue.Queue() for i in range(10): t = threading.Thread(target=producer,args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer) t.start()
四、multiprocessing.dummy模块
multiprocessing是Python的标准模块,它既可以用来编写多进程,也可以用来编写多线程。如果是多线程的话,用multiprocessing.dummy即可,multiprocessing.dummy 是multiprocessing的一个子库,用法与multiprocessing基本相同。multiprocessing.dummy是对threading的封装。
getpid打印自身进程号,getppid打印父进程进程号。
multiprocessing.current_process().name 获取进程名
multiprocessing.current_process().ident 获取进程号
threading.currentThread().name 获取线程名
threading.currentThread().ident 获取线程号
multiprocessing.dummy.current_process().name 获取线程名
multiprocessing.dummy.current_process().ident 获取线程号
1、子线程无返回值
Multiprocessing.dummy.Pool() 与Multiprocessing.Pool() 的用法一样
非阻塞方法--线程并发执行
multiprocessing.dummy.Pool.apply_async() 和 multiprocessing.dummy.Pool.imap()
阻塞方法---线程顺序执行
multiprocessing.dummy.Pool.apply()和 multiprocessing.dummy.Pool.map()
from multiprocessing.dummy import Pool as Pool import time def func(msg): print(\'msg:\', msg) time.sleep(2) print(\'end:\') pool = Pool(processes=3) for i in range(1, 5): msg = \'hello %d\' % (i) pool.apply_async(func, (msg,)) # 非阻塞 # pool.apply(func,(msg,)) # 阻塞,apply()源自内建函数,用于间接的调用函数,并且按位置把元祖或字典作为参数传入。 # pool.imap(func,[msg,]) # 非阻塞, 注意与apply传的参数的区别 # pool.map(func, [msg, ]) # 阻塞 print(\'Mark~~~~~~~~~~~~~~~\') pool.close() pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print(\'sub-process done\')
2、子进程有返回值
与多进程一样,只有multiprocessing.dummy.Pool.apply_async()可以有返回值,apply,map,imap不可以设置返回值.
五、threading模块
threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
- threading.currentThread(): 返回当前的线程变量。
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
1、threading.Thread
Thread(group: None = ..., target: (*args: Any, **kwargs: Any) -> Any | None = ..., name: str | None = ..., args: Iterable = ..., kwargs:
Mapping[str, Any] | None = ..., *, daemon: bool | None = ...) Thread的参数: * group *应该为None; 当实现ThreadGroup类时保留给以后的扩展。 * target *是要由run()方法调用的可调用对象。 默认为无,表示什么都不会被调用。 * name *是线程名称。 默认情况下,唯一名称的格式为“ Thread-N”,其中N是一个小十进制数字。 * args *是目标调用的参数元组。 默认为()。 * kwargs *是用于目标调用的关键字参数字典。 默认为{}。 如果子类覆盖了该构造函数,则必须确保在对线程执行其他任何操作之前,先调用基类的构造函数(Thread .__ init __())。 Thread的方法: t.start() : 激活线程, t.getName() : 获取线程的名称 t.setName() : 设置线程的名称 t.name : 获取或设置线程的名称 t.is_alive() : 判断线程是否为激活状态 t.isAlive() :判断线程是否为激活状态 t.setDaemon() 设置为后台线程或前台线程(即是否为守护线程,默认:False)必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行, 主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止 t.isDaemon() : 判断是否为守护线程 t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。 t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义 t.run() :线程被cpu调度后自动执行线程对象的run方法
#线程开启方法1 from threading import Thread # 创建线程的模块 def task(name): print(name) if __name__ == \'__main__\': # 开启线程 参数1:方法名(不要带括号) 参数2:参数(元祖) 返回对象 p = Thread(target=task, args=(\'线程1\',)) p.start() # 只是给操作系统发送了一个就绪信号,并不是执行。操作系统接收信号后安排cpu运行 print(\'主\') #线程开启方法2 - 类的方法 from threading import Thread # 创建线程的模块 class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): # 固定名字run !!!必须用固定名 print(self.name) if __name__ == \'__main__\': # 必须要这样启动 p = MyThread(\'子线程1\') p.start() print(\'主)
2、线程锁threading.RLock和threading.Lock
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。所以可能出现如下问题:
例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 获得锁 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release() # 释放锁 for i in range(10): t =threading.Thread(target=Func) t.start()
RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。
3、threading.Event
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False
- set:将“Flag”设置为True
- Event.isSet() :判断标识位是否为Ture。
import threading def do(event): print(\'为啥我们可以取不完整类型的地址?Node.js Mongoose 更新嵌入式文档,拉取不持久