python多线程爬虫设计及实现示例

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python多线程爬虫设计及实现示例相关的知识,希望对你有一定的参考价值。

爬虫的基本步骤分为:获取,解析,存储。假设这里获取和存储为io密集型(访问网络和数据存储),解析为cpu密集型。那么在设计多线程爬虫时主要有两种方案:第一种方案是一个线程完成三个步骤,然后运行多个线程;第二种方案是每个步骤运行一个多线程,比如N个线程进行获取,1个线程进行解析(多个线程之间切换会降低效率),N个线程进行存储。

下面我们尝试抓取http://www.chembridge.com/ 库存药品信息。

首先确定url为http://www.chembridge.com/search/search.phpsearchType=MFCD&query=‘+line+‘&type=phrase&results=10&search=1其中line为要搜索的药品信息(要搜索的药品信息保存在本地文件txt中),这里使用requests库进行http请求,获取页面的代码如下:

url=http://www.chembridge.com/search/search.php?searchType=MFCD&query=+line+&type=phrase&results=10&search=1
response = requests.get(url,headers=self.headers[0],timeout=20)
html_doc=response.text

页面解析使用beautifulsoup库,部分代码如下:

soup = BeautifulSoup(html_doc, lxml)
div=soup.find(id=BBResults)
if div:
    links=div.select(a.chemical)
    for link in links:
        try:
            self.get_page_link(link,line)
        except Exception as e:
            print(%s入库失败:%line,e)
            time.sleep(self.relay*2)
            print(%s重新入库%line)
            self.get_page_link(link,line)
            continue
print(%s搜索完成%line)
def get_page_link(self,link,line):
        res=[]
        href=link.get(href)
        print(href)
        time.sleep(self.relay*2*random.randint(5,15)/10)
        r=requests.get(href,headers=self.headers[1],timeout=20)
        if r.status_code==200:
            parse_html=r.text
            soup1=BeautifulSoup(parse_html, lxml)
            catalogs=[catalog.get_text() for catalog in soup1.select(form div.matter h2)]#获取catalog
            # print(catalogs)
            table_headers=[table_header.get_text(strip=True) for table_header in soup1.select(form .matter thead tr)]
            if AmountPriceQty. in table_headers:
                index=table_headers.index(AmountPriceQty.)
                catalog=catalogs[0]
                trs=soup1.select(.form tbody tr)
                if len(catalogs)>1:
                    catalog=catalogs[index]
                for tr in trs:
                    if len(tr.select(td))>1:
                        row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select(td))
                        res.append(row)

最后将res保存到mysql数据库:

conn=mysql.connector.connect(host=localhost,user=root, passwd=password, db=test)
cursor = conn.cursor()
sql = INSERT INTO chembridge VALUES(%s,%s,%s,%s)
cursor.executemany(sql,res)
print(入库)
conn.commit()
cursor.close()
conn.close()

 

 一、单线程爬虫封装的完整代码如下:

# -*- coding:utf-8 -*-
import requests,random,time
from bs4 import BeautifulSoup
import mysql.connector

class Spider:
    def __init__(self):
        self.headers=[{
            Host:www.chembridge.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate,
            Referer:http://www.chembridge.com/search/search.php?search=1,
            Connection:keep-alive,
            Upgrade-Insecure-Requests:1
        },
        {
            Host:www.hit2lead.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate, br
        }]
        self.filename=MDL.txt

    def get_page_link(self,link):
        res=[]
        href=link.get(href)
        print(href)
        parse_html=requests.get(href,headers=self.headers[1]).text
        soup1=BeautifulSoup(parse_html, lxml)
        catalogs=[catalog.get_text() for catalog in soup1.select(form div.matter h2)]#获取catalog
        print(catalogs)
        table_headers=[table_header.get_text(strip=True) for table_header in soup1.select(form .matter thead tr)]
        print(table_headers)
        index=table_headers.index(AmountPriceQty.)
        catalog=catalogs[0]
        trs=soup1.select(.form tbody tr)
        # print(trs)
        if len(catalogs)>1:
            catalog=catalogs[index]
        for tr in trs:
            if len(tr.select(td))>1:
                row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select(td))
                res.append(row)
        print(res)
        conn=mysql.connector.connect(host=localhost,user=root, passwd=password, db=test)
        cursor = conn.cursor()
        sql = INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)
        cursor.executemany(sql,res)
        conn.commit()
        cursor.close()
        conn.close()

    def get_page(self,line):
        url=http://www.chembridge.com/search/search.php?searchType=MFCD&query=+line+&type=phrase&results=10&search=1
        try:
            response = requests.get(url,headers=self.headers[0],timeout=20)
            print(response.status_code)
            html_doc=response.text
            # print(html_doc)
            soup = BeautifulSoup(html_doc, lxml)
            div=soup.find(id=BBResults)
            if div:
                links=div.select(a.chemical)
                for link in links:
                    self.get_page_link(link)
            relay=random.randint(2,5)/10
            print(relay)
            time.sleep(relay)
        except Exception as e:
            print(except:, e)

    def get_file(self,filename):
        i=0
        f=open(filename,r)
        for line in f.readlines():
            line=line.strip()
            print(line)
            self.get_page(line)
            i=i+1
            print(第%s个%(i))
        f.close()

    def run(self):
        self.get_file(self.filename)

spider=Spider()
starttime=time.time()
spider.run()
print(耗时:%f s%(time.time()-starttime))

二、多线程爬虫设计代码

1.第一种设计方案的实现示例:

 

# -*- coding:utf-8 -*-
from threading import Thread
import threading
from queue import Queue
import os,time,random
import requests,mysql.connector
from bs4 import BeautifulSoup
from openpyxl.workbook import Workbook
from openpyxl.styles import Font

class ThreadCrawl(Thread):
    def __init__(self,tname,relay):
        Thread.__init__(self)
        #super(MyThread2, self).__init__()
        # self.queue=queue
        # self.lock=lock
        # self.conn=conn
        self.relay=relay*random.randint(5,15)/10
        self.tname=tname
        self.num_retries=3  #设置尝试重新搜索次数
        self.headers=[{
            Host:www.chembridge.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate,
            Referer:http://www.chembridge.com/search/search.php?search=1,
            Connection:keep-alive,
            Upgrade-Insecure-Requests:1
        },
        {
            Host:www.hit2lead.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate, br
        }]

    def run(self):
        print(%s 开始爬取%self.tname)
        # line = my_queue.get()
        # print(line)
        # while not self.queue.empty():
        while len(words)>0:
            lock.acquire()
            line = words[0]
            words.pop(0)
            lock.release()
            self.get_page(line,self.num_retries)
            time.sleep(self.relay*random.randint(5,15)/10)

        while not my_queue.empty():
            line=my_queue.get()
            print(重新爬取%s...%line)
            self.get_page(line,num_retries=1)
        print(%s 结束%self.tname)


    #获取页面内容
    def get_page(self,line,num_retries=2):
        print(%s正在搜索%s...%(self.tname,line))
        # write this thread task
        url=http://www.chembridge.com/search/search.php?searchType=MFCD&query=+line+&type=phrase&results=10&search=1
        try:
            response = requests.get(url,headers=self.headers[0],timeout=20)
            status=response.status_code
            if status==200:
                html_doc=response.text
                # print(html_doc)
                soup = BeautifulSoup(html_doc, lxml)
                div=soup.find(id=BBResults)
                if div:
                    links=div.select(a.chemical)
                    for link in links:
                        try:
                            self.get_page_link(link,line)
                        except Exception as e:
                            print(%s入库失败:%line,e)
                            time.sleep(self.relay*2)
                            print(%s重新入库%line)
                            self.get_page_link(link,line)
                            continue
                print(%s搜索完成%line)
                lock.acquire()
                global count
                count=count+1
                print(已完成%s个%count)
                lock.release()
                # time.sleep(self.relay*random.randint(5,15)/10)
            else:
                print(%s搜索%s网络异常,错误代码:%s%(self.tname,line,status))
                # time.sleep(self.relay*random.randint(5,15)/10)
                if num_retries>0:
                    print(%s尝试重新搜索%s%(self.tname,line))
                    time.sleep(self.relay*random.randint(5,15)/10)
                    self.get_page(line,num_retries-1)
                else:
                    print(%s四次搜索失败!!!%line)
                    my_queue.put(line)
                    # error_list.append(line)

        except Exception as e:
            print(%s搜索%s异常,error:%(self.tname,line), e)
            # time.sleep(self.relay*random.randint(5,15)/10)
            if num_retries>0:
                print(%s尝试重新搜索%s%(self.tname,line))
                time.sleep(self.relay*random.randint(5,15)/10)
                self.get_page(line,num_retries-1)
            else:
                print(%s四次搜索失败!!!%line)
                my_queue.put(line)
                # error_list.append(line)
        # self.queue.task_done()

    #获取下一页链接并解析入库
    def get_page_link(self,link,line):
        res=[]
        href=link.get(href)
        print(href)
        time.sleep(self.relay*2*random.randint(5,15)/10)
        r=requests.get(href,headers=self.headers[1],timeout=20)
        if r.status_code==200:
            parse_html=r.text
            soup1=BeautifulSoup(parse_html, lxml)
            catalogs=[catalog.get_text() for catalog in soup1.select(form div.matter h2)]#获取catalog
            # print(catalogs)
            table_headers=[table_header.get_text(strip=True) for table_header in soup1.select(form .matter thead tr)]
            if AmountPriceQty. in table_headers:
                index=table_headers.index(AmountPriceQty.)
                catalog=catalogs[0]
                trs=soup1.select(.form tbody tr)
                if len(catalogs)>1:
                    catalog=catalogs[index]
                for tr in trs:
                    if len(tr.select(td))>1:
                        row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select(td))
                        res.append(row)
                # print(res)
                lock.acquire()
                conn=mysql.connector.connect(host=‘localhost,user=root, passwd=password, db=test)
                cursor = conn.cursor()
                try:
                    print(%s: %s正在入库...%(line,catalog))
                    sql = INSERT INTO chembridge VALUES(%s,%s,%s,%s)
                    cursor.executemany(sql,res)
                    conn.commit()
                except Exception as e:
                    print(e)
                finally:
                    cursor.close()
                    conn.close()
                    lock.release()

def writeToExcel(datas,filename):
    # 在内存创建一个工作簿obj
    result_wb = Workbook()
    #第一个sheet是ws
    ws1 = result_wb.worksheets[0]
    # ws1=wb1.create_sheet(‘result‘,0)
    #设置ws的名称
    ws1.title = "爬取结果"
    row0 = [catalog, amount, price, qty]
    ft = Font(name=Arial, size=11, bold=True)
    for k in range(len(row0)):
        ws1.cell(row=1,column=k+1).value=row0[k]
        ws1.cell(row=1,column=k+1).font=ft
    for i in range(1,len(datas)+1):
        for j in range(1,len(row0)+1):
            ws1.cell(row=i+1,column=j).value=datas[i-1][j-1]
    # 工作簿保存到磁盘
    result_wb.save(filename = filename)

if __name__ == __main__:
    starttime=time.time()
    lock = threading.Lock()

    words=[] # 存放搜索字段的数据
    basedir=os.path.abspath(os.path.dirname(__file__))
    filename=MDL.txt
    file=os.path.join(basedir,filename) #文件路径
    f=open(file,r)
    for line in f.readlines():
        line=line.strip()
        words.append(line)
    f.close()

    count=0  # 爬取进度计数
    # global my_queue
    my_queue = Queue() #FIFO队列,存放第一次搜索失败的字段,保证线程同步
    error_list=[] #存放最终搜索失败的字段数组
    threads=[]

    # 程序开始前清空数据库chembridge表数据
    conn=mysql.connector.connect(host=‘localhost,user=root, passwd=password, db=test)
    cursor = conn.cursor()
    print(清空表...)
    cursor.execute(delete from chembridge)
    conn.commit()
    cursor.close()
    conn.close()

    num_threads=10  #设置爬虫数量
    relay=10  # 设置爬取时延,时延=relay*(0.5~1.5之间的随机数)
    threadList = []
    for i in range(1,num_threads+1):
        threadList.append(爬虫-%s%i)
    # 开启多线程
    for tName in threadList:
        thread = ThreadCrawl(tName,relay)
        thread.setDaemon(True)
        thread.start()
        threads.append(thread)
        time.sleep(1)
    # 主线程阻塞,等待所有子线程运行结束
    for t in threads:
        t.join()

    #将数据保存到excel
    conn=mysql.connector.connect(host=‘localhost,user=root, passwd=password, db=test)
    cursor = conn.cursor()
    cursor.execute(select * from chembridge)
    datas=cursor.fetchall()
    conn.commit()
    cursor.close()
    conn.close()
    writeToExcel(datas,result.xlsx)

    #统计结果
    while not my_queue.empty():
        error_line=my_queue.get()
        error_list.append(error_line)
    print(爬取完成!\\n)
    if len(error_list)==0:
        print(爬取失败列表:0个)
    else:
        print(总共爬取失败%s个:%len(error_list),,.join(error_list))
    # print(‘爬取完成!‘)
    print(耗时:%f s%(time.time()-starttime))

words为存放搜索记录的数组,当搜索记录失败时,会立即尝试重新搜索,num_retries为每条记录的最大搜索次数。如果某条记录在搜索num_retries次后仍失败,会把访问失败的word加入my_queue队列中。

当所有words搜索完时,会重新搜索my_queue中的所有word,循环直到my_queue为空(即所有word搜索成功)。

注意:这里要注意python多线程的GIL,修改同一个全局变量要加锁。

运行截图:

 技术分享

2.第二种设计方案的实现示例

urls_queue、html_queue和item_queue3分别存放要访问的url、要解析的页面和爬取到的结果。分别设计三个类,Fetcher类根据url进行简单的抓取,Parser类根据抓取内容进行解析,生成待保存的ItemSaver类进行Item的保存。当urls_queue、html_queue和item_queue3个队列同时为空时,所有子线程终止,任务结束。

# coding=utf-8
import threading
import queue,requests
import time,random
import mysql.connector
from bs4 import BeautifulSoup

class Fetcher(threading.Thread):
    def __init__(self,urls_queue,html_queue):
        threading.Thread.__init__(self)
        self.__running=threading.Event()
        self.__running.set()
        self.urls_queue = urls_queue
        self.html_queue = html_queue
        self.num_retries=3  #设置尝试重新搜索次数
        self.headers={
            Host:www.chembridge.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate,
            Referer:http://www.chembridge.com/search/search.php?search=1,
            Connection:keep-alive,
            Upgrade-Insecure-Requests:1
        }

    def run(self):
        while not self.urls_queue.empty():
        # while self.__running.isSet():
            line=self.urls_queue.get()
            print(line)
            time.sleep(2*random.randint(5,15)/10)
            # self.urls_queue.task_done()
            self.get_page(line,self.num_retries)
    def get_page(self,line,num_retries=2):
        url=http://www.chembridge.com/search/search.php?searchType=MFCD&query=+line+&type=phrase&results=10&search=1
        try:
            response = requests.get(url,headers=self.headers,timeout=20)
            status=response.status_code
            if status==200:
                html_doc=response.text
                print(html_doc)
                self.html_queue.put(html_doc)
                # self.urls_queue.task_done()
                print(%s搜索完成%line)
            else:
                print(搜索%s网络异常,错误代码:%s%(line,status))
                if num_retries>0:
                    print(尝试重新搜索%s%(line))
                    time.sleep(2*random.randint(5,15)/10)
                    self.get_page(line,num_retries-1)
                else:
                    print(%s四次搜索失败!!!%line)
                    self.urls_queue.put(line)

        except Exception as e:
            print(%s搜索异常,error:%line,e)
            if num_retries>0:
                print(尝试重新搜索%s%(line))
                time.sleep(2*random.randint(5,15)/10)
                self.get_page(line,num_retries-1)
            else:
                print(%s四次搜索失败!!!%line)
                self.urls_queue.put(line)

    def stop(self):
        self.__running.clear()

class Parser(threading.Thread):
    def __init__(self, html_queue,item_queue):
        threading.Thread.__init__(self)
        self.__running=threading.Event()
        self.__running.set()
        self.html_queue = html_queue
        self.item_queue = item_queue
        self.num_retries=3  #设置尝试重新搜索次数
        self.headers={
            Host:www.hit2lead.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate, br
        }
    def run(self):
        while self.__running.isSet():
            print(html_queue长度: ,self.html_queue.qsize())
            # if self.html_queue.empty():
            #     break
            html_doc=self.html_queue.get()
            try:
                soup = BeautifulSoup(html_doc, lxml)
                div=soup.find(id=BBResults)
                if div:
                    links=div.select(a.chemical)
                    for link in links:
                        self.get_page_link(link,self.num_retries)
                relay=random.randint(20,50)/10
                # print(relay)
                time.sleep(relay)
            except Exception as e:
                self.html_queue.put(html_doc)
            # self.html_queue.task_done()

    def get_page_link(self,link,num_retries=2):
        print(haha)
        time.sleep(2*random.randint(5,15)/10)
        res=[]
        href=link.get(href)
        print(href)
        response=requests.get(href,headers=self.headers,timeout=20)
        status=response.status_code
        if status==200:
            parse_html=response.text
            soup1=BeautifulSoup(parse_html, lxml)
            catalogs=[catalog.get_text() for catalog in soup1.select(form div.matter h2)]#获取catalog
            # print(catalogs)
            table_headers=[table_header.get_text(strip=True) for table_header in soup1.select(form .matter thead tr)]
            # print(table_headers)
            if AmountPriceQty. in table_headers:
                index=table_headers.index(AmountPriceQty.)
                catalog=catalogs[0]
                trs=soup1.select(.form tbody tr)
                # print(trs)
                if len(catalogs)>1:
                    catalog=catalogs[index]
                for tr in trs:
                    if len(tr.select(td))>1:
                        row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select(td))
                        res.append(row)
                # print(res)
                self.item_queue.put(res)
        else:
            print(搜索%s网络异常,错误代码:%s%(link,status))
            # time.sleep(self.relay*random.randint(5,15)/10)
            if num_retries>0:
                print(尝试重新搜索%s%(link))
                time.sleep(random.randint(5,15)/10)
                self.get_page_link(link,num_retries-1)
            else:
                print(%s四次搜索失败!!!%line)
    def stop(self):
        self.__running.clear()

class Saver(threading.Thread):
    def __init__(self, item_queue):
        threading.Thread.__init__(self)
        self.__running=threading.Event()
        self.__running.set()
        self.item_queue = item_queue

    def run(self):
        # while not self.item_queue.empty():
        while self.__running.isSet():
            print(item_queue长度: ,self.item_queue.qsize())
            res=self.item_queue.get()
            print(res)
            conn=mysql.connector.connect(host=localhost,user=root, passwd=password, db=test)
            cursor = conn.cursor()
            sql = INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)
            cursor.executemany(sql,res)
            print(入库)
            conn.commit()
            cursor.close()
            conn.close()
    def stop(self):
        self.__running.clear()


if __name__ == __main__:
    starttime=time.time()
    lock = threading.Lock()
    urls_queue = queue.Queue()
    html_queue = queue.Queue()
    item_queue = queue.Queue()

    conn=mysql.connector.connect(host=localhost,user=root, passwd=password, db=test)
    cursor = conn.cursor()
    print(清空表...)
    cursor.execute(delete from chembridge_test2)
    conn.commit()
    cursor.close()
    conn.close()

    print(start...)

    f=open(MDL1.txt,r)
    for line in f.readlines():
        line=line.strip()
        urls_queue.put(line)
    f.close()

    threads=[]
    for j in range(8):
        thread1 = Fetcher(urls_queue,html_queue)
        thread1.setDaemon(True)
        thread1.start()
        threads.append(thread1)
    for j in range(1):
        thread1 = Parser(html_queue,item_queue)
        thread1.setDaemon(True)
        thread1.start()
        threads.append(thread1)
    for j in range(2):
        thread1 = Saver(item_queue)
        thread1.setDaemon(True)
        thread1.start()
        threads.append(thread1)


    # while not urls_queue.empty():
    #     while not html_queue.empty():
    #         while not item_queue.empty():
    #             pass
    while True:
        time.sleep(0.5)
        if urls_queue.empty() and html_queue.empty() and item_queue.empty():
            break

    print(完成!)
    for t in threads:
        t.stop()
    for t in threads:
        t.join()
    print(end)
    print(耗时:%f s%(time.time()-starttime))

根据网络情况,设置线程数量,避免requests访问网络时阻塞。

另外附上用scrapy实现的代码

items.py

import scrapy

class ChemItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    catalog=scrapy.Field()
    amount=scrapy.Field()
    price=scrapy.Field()
    qty=scrapy.Field()

quotes_spider.py

# -*- coding: utf-8 -*-
import scrapy
from scrapy.selector import Selector
from tutorial.items import ChemItem

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    # allowed_domains = ["chembridge.com"]
    headers=[{
            Host:www.chembridge.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate,
            Referer:http://www.chembridge.com/search/search.php?search=1,
            Connection:keep-alive,
            Upgrade-Insecure-Requests:1
        },
        {
            Host:www.hit2lead.com,
            User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0,
            Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8,
            Accept-Language:zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3,
            Accept-Encoding:gzip, deflate, br
        }]
    def start_requests(self):
        start_urls = []
        f=open(MDL.txt,r)
        for line in f.readlines():
            line=line.strip()
            print(line)
            start_urls.append(http://www.chembridge.com/search/search.php?searchType=MFCD&query=+line+&type=phrase&results=10&search=1)
        for url in start_urls:
            yield scrapy.Request(url=url, callback=self.parse,headers=self.headers[0])

    def parse(self, response):
        links=response.css(#BBResults a.chemical::attr(href)).extract()
        for link in links:
            yield scrapy.Request(url=link,callback=self.parse_dir_contents,headers=self.headers[1])

    def parse_dir_contents(self, response):
        items=[]
        catalogs=response.css(form div.matter h2::text).extract()
        table_headers=[‘‘.join(res.re(r>(.*)</td>)) for res in response.css(form div.matter thead tr)]
        print(table_headers)
        index=table_headers.index(AmountPriceQty.)
        catalog=catalogs[0]
        trs=response.css(.form tbody tr)
        if len(catalogs)>1:
            catalog=catalogs[index]
        for tr in trs:
            if len(tr.css(td))>1:
                item=ChemItem()
                # print(tr.css(‘td::text‘).extract())
                # row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.css(‘td‘))
                item[catalog]=catalog
                item[amount]=tr.css(td)[0].css(::text).extract()[0]
                item[price]=|.join(tr.css(td)[1].css(::text).extract())
                print(len(tr.css(td::text)))
                item[qty]=tr.css(td)[2].css(::text).extract()[0] if len(tr.css(td)[2].css(::text).extract())==1 else tr.css(td)[2].css(::attr(value)).extract()[0]
                # self.log(‘Saved result %s‘ % item)
                # print(tr.css(‘td::text‘)[0].extract())
                yield item
                # items.append(item)
        # return items

pipelines.py

#将数据存储到mysql数据库
from twisted.enterprise import adbapi
import MySQLdb
import MySQLdb.cursors
from scrapy import log

class MySQLStorePipeline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool

    #数据库参数
    @classmethod
    def from_settings(cls, settings):
        dbargs = dict(
            host=settings[MYSQL_HOST],
            db=settings[MYSQL_DBNAME],
            user=settings[MYSQL_USER],
            passwd=settings[MYSQL_PASSWD],
            charset=utf8,
            cursorclass = MySQLdb.cursors.DictCursor,
            use_unicode= True,
        )
        dbpool = adbapi.ConnectionPool(MySQLdb, **dbargs)
        return cls(dbpool)


    # #数据库参数
    # def __init__(self):
    #     dbargs = dict(
    #          host = ‘localhost‘,
    #          db = ‘test‘,
    #          user = ‘root‘,
    #          passwd = ‘password‘,
    #          cursorclass = MySQLdb.cursors.DictCursor,
    #          charset = ‘utf8‘,
    #          use_unicode = True
    #         )
    #     self.dbpool = adbapi.ConnectionPool(‘MySQLdb‘,**dbargs)

    ‘‘‘
    The default pipeline invoke function
    ‘‘‘
    def process_item(self, item,spider):
        res = self.dbpool.runInteraction(self.insert_into_table,item)
        res.addErrback(self.handle_error)
        return item
    #插入的表,此表需要事先建好
    def insert_into_table(self,conn,item):
            conn.execute(insert into chembridge(catalog, amount, price,qty) values(%s,%s,%s,%s), (
                item[catalog],
                item[amount],
                 # item[‘star‘][0],
                 item[price],
                 item[qty]
                ))
    def handle_error(self,e):
        log.err(e)

settings.py

FEED_EXPORTERS = {
    csv: tutorial.spiders.csv_item_exporter.MyProjectCsvItemExporter,
} #tutorial为工程名

FIELDS_TO_EXPORT = [
    catalog,
    amount,
    price,
    qty
]

LINETERMINATOR=\\n


ITEM_PIPELINES = {
   tutorial.pipelines.MySQLStorePipeline: 300,
}

# start MySQL database configure setting
MYSQL_HOST = localhost
MYSQL_DBNAME = test
MYSQL_USER = root
MYSQL_PASSWD = password
# end of MySQL database configure setting

main.py

# -*- coding: utf-8 -*-
from scrapy import cmdline
cmdline.execute("scrapy crawl quotes -o items.csv -t csv".split())

最后运行main.py,将结果同时保存到csv文件和mysql数据库中。

 

以上是关于python多线程爬虫设计及实现示例的主要内容,如果未能解决你的问题,请参考以下文章

Python 多线程爬虫

有没有易懂的 Python 多线程爬虫代码

干货|500行Python代码构建一个轻量级爬虫框架(大神)

Python爬虫提速小技巧,多线程与多进程(附源码示例)

Python GUI编程:音乐播放器(多线程爬虫进度条文件)

Python爬虫案例演示:Python多线程多进程协程