爬虫多进程

Posted wl443587

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了爬虫多进程相关的知识,希望对你有一定的参考价值。

multiprocessing
python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock

 

 

.Process

创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

 

1.创建函数并将其作为单个进程

import multiprocessing
import time

def test(i):
    print(i)

if __name__ == "__main__":
    p = multiprocessing.Process(target = test, args = (3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

结果:
p.pid: 6716
p.name: Process-1
p.is_alive: True
3

 

 

2.创建函数并将其作为多个进程

import multiprocessing
import time

def test_1(i):
    print(i)

def test_2(interval):
    print(i)

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = test_1, args = (2,))
    p2 = multiprocessing.Process(target = test_2, args = (3,))
    p1.start()
    p2.start()

结果:2 3

 

3.将进程定义为类

import multiprocessing
import time

class TestProcess(multiprocessing.Process):
    def __init__(self, i):
        multiprocessing.Process.__init__(self)
        self.i = i

def run(self):
    print(self.i)

if __name__ == __main__:
    p = TestProcess(3)
    p.start()     

ps:进程p调用start()时,自动调用run()
结果: 3

 

4.daemon属性

加上属性(p.daemon = True), 主进程结束,它们就随着结束了

import multiprocessing
import time

def test(s):
    print("开始:",time.ctime());
    time.sleep(s)
    print("结束:",time.ctime());

if __name__ == "__main__":
    p = multiprocessing.Process(target = test, args = (3,))
    p.daemon = True  # 主进程结束,它们就随着结束了.
    p.start()
    # p.join() 加上此方法主进程完成也执行test方法
    print(lol)
 
结果
开始: Wed Nov  7 19:55:35 2018
结束: Wed Nov  7 19:55:38 2018
lol

 

 

二. 进程间通信

进程之间的数据时不同享的,进程之间通过以下几种方式来通信

1. Event

Event用来实现进程间同步通信。

 

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
 
if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",target = wait_for_event,args = (e,))
    w2 = multiprocessing.Process(name = "non-block",target = wait_for_event_timeout,args = (e, 2))
    w1.start()
    w2.start()
    time.sleep(3)
    e.set()
    print("main: event is set")

结果:
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True 

 

2. Queue

使用方式和进程中的差不多

import multiprocessing

def foo1(q):
    q.put([33,wl])

def foo2(q):
    print(foo2:,q.get())

if __name__ == __main__:
    q = multiprocessing.Queue(1500)
    p1 = multiprocessing.Process(target=foo1, args=(q,))
    p2 = multiprocessing.Process(target=foo2, args=(q,))
    p1.start()
    p2.start()
    print(end)
 
结果
end
foo2: [33, wl]

 

3. Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

import multiprocessing

def foo1(pipe):
    pipe.send([33,wl])
    pipe.close()

def foo2(pipe):
    print(foo2:,pipe.recv())

if __name__ == __main__:
    p, c = multiprocessing.Pipe(100)
    p1 = multiprocessing.Process(target=foo1, args=(p,))
    p2 = multiprocessing.Process(target=foo2, args=(c,))
    p1.start()
    p2.start()
    print(end)

结果
end
foo2: [33, wl]

 

 

. 进程锁

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

 

from multiprocessing import Process,Lock

def run(L,i):
    L.acquire()
    print(hello world, i)
    L.release()

if __name__ == __main__:
    L = Lock()
    for i in range(10):
        Process(target=run,args=(L,i)).start()

结果(输出文件)
hello world 7
hello world 3
hello world 6
hello world 5
hello world 0
hello world 2
hello world 4
hello world 9
hello world 8
hello world 1

 

2. Semaphore

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release
");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

 

 

.进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

 函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,并行
  • apply(func[, args[, kwds]])是阻塞的, 串行
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用

1.使用进程池(非阻塞)

from multiprocessing import Process,Pool
import time

def foo(i):
    print(foo:,i)

def Bar(a):
    print(done,a)

if  __name__ ==  __main__:
    pool = Pool(5)  # 进程池中允许放入5个线程
    for i in range(10):
        pool.apply_async(func = foo,args=(i,))
    print(end)
    pool.close()
    pool.join()# 主线程等待子进程执行完成,必须在close()或者terminate()之后

 

2.使用多个进程池

import multiprocessing
import os, time, random

def Lee():
    print("
Run task Lee-%s" % (os.getpid()))  # os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()随机生成0-1之间的小数
    end = time.time()
    print(Task Lee, runs %0.2f seconds. % (end - start))

def Marlon():
    print("
Run task Marlon-%s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print(Task Marlon runs %0.2f seconds. % (end - start))

def Allen():
    print("
Run task Allen-%s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print(Task Allen runs %0.2f seconds. % (end - start))

def Frank():
    print("
Run task Frank-%s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print(Task Frank runs %0.2f seconds. % (end - start))

if __name__ == __main__:
    function_list = [Lee, Marlon, Allen, Frank]
    print("parent process %s" % (os.getpid()))
    pool = multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func = func)  # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    print(Waiting for all subprocesses done...)
    pool.close()
    pool.join()  # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print(All subprocesses done.)

 

 

.进程与爬虫

1.多进程爬虫

from multiprocessing import Process,Queue,Pool,Manager
import time,requests

link_list = []
with open(url.txt,r) as f:
    for u in f.readlines():
        url = u.split()[0].replace(
,‘‘)
        link_list.append(url)

class myProcess(Process):
    def __init__(self,name,q):
        Process.__init__(self)
        self.name = name
        self.userName = 0
        self.q = q

    def run(self):
        print(开始:,self.name)
        while not self.q.empty():
            try:
                self.craw(self.name,self.q)
            except Exception as e:
                break
        print(结束:,self.name)

    def writeImages(self, ThreadName, url):
        print("正在存储文件 %s ..." % ThreadName+str(self.userName))
        path = rD:zhappian + \\ + ThreadName+str(self.userName) + .png
        file = open(path, wb)
        images = requests.get(url,timeout = 20).content
        file.write(images)
        file.close()
        self.userName += 1

    def craw(self,name,q):
        url = q.get(timeout = 2)
        try:
            self.writeImages(name, url)
        except Exception as e:
            print(q.qsize(),url,e)

if __name__ == __main__:
    work_queue = Queue(1500)

    # 填充队列
    for url in link_list:
        work_queue.put(url)

    start_time = time.time()

    # 创建新进程
    for i in range(5):
        t = myProcess(进程- + str(i + 1), work_queue)
        t.daemon = True
        t.start()
        t.join()

    end_time = time.time()
    print(爬虫的运行时间为:,end_time - start_time)

 

 

 

2.Pool+Queue的爬虫

from multiprocessing import Process, Queue, Pool, Manager
import time, requests

link_list = []
with open(url.txt, r) as f:
    for u in f.readlines():
        url = u.split()[0].replace(
, ‘‘)
        link_list.append(url)

def writeImages(ThreadName, url, userName):
    print("正在存储文件 %s ..." % ThreadName + str(userName))
    path = rD:zhappian + \\ + ThreadName + str(userName) + .png
    file = open(path, wb)
    images = requests.get(url, timeout=20).content
    file.write(images)
    file.close()

def craw(name, q):
    url = q.get(timeout=2)
    userName = 0
    while not q.empty():
        try:
            writeImages(name, url, userName)
            userName += 1
        except Exception as e:
            print(q.qsize(), url, e)

if __name__ == __main__:
    m = Manager()
    work_queue = m.Queue(1500)
    pool = Pool(5)

    # 填充队列
    for url in link_list:
        work_queue.put(url)
    start_time = time.time()

    # 创建新进程
    for i in range(5):
        pool.apply_async(func=craw,args=(Process+str(i+1),work_queue))
    print(开始。。。。)
    pool.close()
    pool.join()

    end_time = time.time()
    print(爬虫的运行时间为:, end_time - start_time)

 




以上是关于爬虫多进程的主要内容,如果未能解决你的问题,请参考以下文章

代码片段:Shell脚本实现重复执行和多进程

Python爬虫案例演示:Python多线程多进程协程

爬虫:多进程爬虫

有没有易懂的 Python 多线程爬虫代码

Python爬虫实例多进程下载金庸网小说

Python爬虫编程思想(138):多线程和多进程爬虫--从Thread类继承