Thread是python中的一个多线程类,我们可以通过给它传递target函数或者创建一个自己的类来继承Thread来使用他。queue是python中的一个消息队列,它实现了python中线程数据的共享,并解决了传统多线程需要对共享数据上锁,解锁的问题,极大的方便了我们的多线程编程。通过Thread+queue我们可以实现一个基于生产者+消费者模式的多线程爬虫,以爬取一个文章论坛为例,生产者负责提取文章的url,并把其保存在queue队列中。消费者也就是我们的多线程爬虫,它由多个线程组成,共享queue的数据,主要负责从queue中get文章链接,每get完一个url后会告知queue,任务数减一,直到queue为空,退出线程并结束进程!
生产者提取文章url,由于文章较少,我选择将提取文章url的类放在主线程中,当所有url提取并放入queue之后才开始启动多线程的消费者爬虫(即解析文章字段的爬虫),生产者的代码如下:
class CrawlUrls: total_urls = [] # 随机User-Agent headers = { "UserAgent": UserAgent().random } def __init__(self, queue): self.queue = queue def run(self): self.get_urls() print(str(self.queue.qsize()) + " " + "urls is put!") def get_urls(self, url="http://python.jobbole.com/all-posts/"): results = requests.get(url, headers=self.headers, timeout=30) soup = BeautifulSoup(results.text, "lxml") links = soup.find_all("a", class_="archive-title") for link in links: link = link.attrs["href"] self.queue.put(link) self.total_urls.append(link) next_urls = soup.select(‘a[class="next page-numbers"]‘) for next_url in next_urls: next_url = next_url.attrs["href"] if next_url: self.get_urls(next_url) pass
此时80多页的python专栏的文章url已经put进queue队列了,此时只需启动多线程的消费者爬虫来解析我们的queue队列里面的url,消费者的代码如下:
class ParseUrls(threading.Thread): def __init__(self, queue, t_name): self.queue = queue self.conn = mysqldb.connect(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DBNAME, charset="utf8", use_unicode=True) self.cursor = self.conn.cursor() threading.Thread.__init__(self, name=t_name) pass def run(self): self.parse_urls() def parse_urls(self): while True: try: url = self.queue.get(block=False) self.queue.task_done() result = requests.get(url=url, timeout=10) selector = etree.html(result.text) title = selector.xpath(r‘//*[@class="entry-header"]/h1/text()‘) title = title[0] if title is not None else None author = selector.xpath(r‘//*[@class="copyright-area"]/a/text()‘) author = author[0] if author is not None else None items = dict(title=title, author=author, url=url) self.insert_mysql(items) except queue.Empty: print("crawl done!") break def insert_mysql(self, value): insert_sql = ‘‘‘ insert into article(title, author, url) VALUES (%s, %s, %s) ‘‘‘ self.cursor.execute(insert_sql, (value["title"], value["author"], value["url"])) self.conn.commit()
最后只需要在main函数中启动我们的线程!
if __name__ == ‘__main__‘: q = queue.Queue() cw = CrawlUrls(q) cw.run() threads = [] thread_nums = 10 for i in range(0, thread_nums+1): bt = ParseUrls(q, "thread" + str(i)) threads.append(bt) for i in range(0, thread_nums+1): threads[i].start() for i in range(0, thread_nums+1): threads[i].join()
以上就是一个简单的基于Thread+queue的多线程爬虫!