91家纺网本地版本定稿,thread版本

Posted dog-and-cat

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了91家纺网本地版本定稿,thread版本相关的知识,希望对你有一定的参考价值。

import re
from odps import ODPS
from threading import Thread
import threading
from urllib import parse
from datetime import datetime

import random 
import requests
import time

from scrapy import Selector
from models import *

store_list_urls = []
product_list_urls = []

domain = "http://www.91jf.com/"
store_domain = "http://www.91jf.com/default.php?act=corp&sort=list&page="
store_url_domain = ‘http://www.91jf.com/default.php?act=store_goods&storeid=‘ # 用于拼接商户id和url
category_url = "http://www.91jf.com/default.php?act=categorygoodslist&category_id=" # 用来拼接商品的url
stor_url_aptitude = ‘http://www.91jf.com/default.php?act=corpcert&id=‘ # 用于拼接商户资质的url

def get_nodes_json():
    left_menu_text = requests.get("http://www.91jf.com/").text
    #write_txt(left_menu_text)
    #etree.html(res0.text)
    sel = Selector(text=left_menu_text)
    all_divs = sel.xpath("//div[@class=‘class_child_li‘]//a[@href]").extract()
    if all_divs:
        nodes_lists = []
        for i in range(len(all_divs)):
            nodes_str = all_divs[i]
            nodes_str = nodes_str.replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
            nodes_lists.append(nodes_str)
        return nodes_lists
    return []

# 获取一级目录数据,保存商品系列ID,用来拼接爬虫入口的url
def process_nodes_list(url):    
    menu_text = requests.get(url).text
    sel = Selector(text=menu_text)
    nodes_list = sel.xpath("//div[@class=‘index_g_class‘]/ul/li")
    for item in nodes_list:
        title = item.xpath("./div[@class=‘class_menu‘]/span/text()").extract()
        title = ‘‘.join(title)
        #主目录的名称
        catalogue_name = title
        catalogue = Catalogue()
        catalogue.catalogue_name = catalogue_name # 系列名称
        catalogue.series_level = 0 # 系列等级
        catalogue_id_0 = 0 # 系列catalogue_id
        catalogue.category_id = catalogue_id_0 # 系列catalogue_id
        catalogue.create_time = datetime.now() # 抓取时间
        existed_id = Catalogue.select().where((Catalogue.catalogue_name==catalogue_name) & (Catalogue.category_id == catalogue_id_0))
        if existed_id:
            #catalogue.save()
            pass  
        else:
            catalogue.save(force_insert=True)
            #print("插入商品目录成功")  

        _id = Catalogue.get(Catalogue.catalogue_name==title)._id # 此处获取父节点的id
        series_names = item.xpath(‘.//div[@class="class_child_li"]//li‘)
        for series_name in series_names:
            
            catalogue_0 = Catalogue()
            catalogue_0.catalogue_name = title # 系列名称
            catalogue_0.series_level = 0 # 系列等级

            series_name_0 =  series_name.xpath(‘.//span/text()‘).extract()
            series_name_0 = ‘‘.join(series_name_0)
            
            category_id = series_name.xpath(".//a[@href]").extract()
            category_id = ‘‘.join(category_id)
            category_id = re.search(‘d.?d‘,category_id).group()
            
            catalogue_0.category_id = category_id # 次级产品系列ID
            catalogue_0.catalogue_name = series_name_0 # 次级产品系列的名称
            catalogue_0.catalogue_level = 2 # 次级产品系列的等级
            catalogue_0.father_id = _id # 父节点的ID
            catalogue_0.create_time = datetime.now() # 抓取时间
            
            existed_id = Catalogue.select().where((Catalogue.catalogue_name==series_name_0) & (Catalogue.category_id == category_id))
            if existed_id:
                #catalogue_0.save()
                pass
            else:
                catalogue_0.save(force_insert=True)  

#根据catalogue存储的数据来获取category_id拼接商品最外层的url链接
def get_catalogue_url():
    url_list = []
    #catalogue = Catalogue()
    id_data = Catalogue.select().where(Catalogue.catalogue_level==2)
    for item in id_data:
        url = category_url + str(item.category_id) + "&okey=salenum&order=desc"
        url_list.append(url)
    #id_data = Catalogue.get(Catalogue.series_level_0==1).category_id
    return url_list

#获取商品的信息
def parse_product_data(url):
    res_text = requests.get(url).text 
    sel = Selector(text=res_text)
    res_li = sel.xpath("//div[@class=‘pro_list_div g-clearfix c‘]/ul//li[@class=‘goods_offset‘]")
    flag_num = 0
    goods_list = []
    for item in res_li:
        product_id = item.xpath(‘./div[contains(@class,"pro_pic_box")]/a[@href]‘).extract() # 产品ID
        product_id = re.search(‘id=.*d"‘,‘‘.join(product_id))
        product_id = product_id.group().replace("id=","")
        product_id = product_id.replace(""","")
        product_id = int(product_id)

        name = item.xpath("./div[@class=‘row row-2 title‘]/a/text()").extract() # 产品名字
        name = ‘‘.join(name)
        price = item.xpath(‘./div[@id="goods_detail_b"]/div[@class="row row-1"]/div[@class="g_price fm2"]/strong/text()‘).extract() # 显示价格
        price = ‘‘.join(price)
        try:
            price = float(price)
        except:
            print("价格会员可见|价格请咨询商家")
            continue
        sales_num = item.xpath("./div[@id=‘goods_detail_b‘]/div[2]/p[1]/text()").extract()  # 销售数量
        sales_num= ‘‘.join(sales_num)
        sales_num = sales_num.split(‘销量:‘)[1]
        sales_num = int(sales_num)
        flag_num = sales_num
        if sales_num < 1:
            continue
        
        store_id = item.xpath("./div[@class=‘row row-3 c‘]/a[@href]").extract()
        store_id = re.search(‘id=.*d"‘,‘‘.join(store_id))
        store_id = store_id.group().replace("id=","")
        store_id = store_id.replace(""","")
        store_id = int(store_id)

        #merchant = item.xpath("./div[@id=‘goods_detail_b‘]/div[2]/p[2]/text()").extract() # 商家
        #merchant = ‘‘.join(merchant)

        main_Products = item.xpath("./div[@id=‘goods_detail_b‘]/div[2]/p[3]/text()").extract() # 主营
        main_Products = ‘‘.join(main_Products)

        #merchant_Place = item.xpath("./div[@id=‘goods_detail_b‘]/div[2]/p[4]/text()").extract() # 地址
        #merchant_Place = ‘‘.join(merchant_Place)

        product = Product()
        product.product_id = product_id
        product.name = name
        product.price = price
        product.sales_num = sales_num
        product.store_id = store_id

        create_time = datetime.now()
        product.create_time = create_time

        product_data = product # 存储单条商品信息
        goods_list.append(product_data)

    data = Spider_91JIAFAN() # 创建对象
    data.up_product_to_odps(goods_list)
    
    next_page = sel.xpath("//*[@class=‘pagination2‘]/a[@href]").extract()
    if len(next_page) > 2 and flag_num > 0:
        url_next = re.search(‘".*d"‘,next_page[-1])
        url_next = url_next.group().replace("&amp;","&") # 此处&由于被转义成&amp;导致需要重新进行处理
        url_next = url_next.replace(""","")
        url_next = parse.urljoin(domain,url_next)
        #print(url_next)
        parse_product_data(url_next)
    else:
        pass

#获取商品链接,上一级url为商品详情页
def parse_data_last(url):
    #store_id_list = []
    flag_num = 0
    #获取商品的详情标签
    while True:
        try:
            res_text = requests.get(url).text
        except:
            time.sleep(3)
            print(‘间隔休眠时间,再次处理‘)
        else:
            break
    sel = Selector(text=res_text)
    res_li = sel.xpath("//div[@class=‘pro_list_div g-clearfix c‘]/ul//li[@class=‘goods_offset‘]")
    for item in res_li:
        sales_num = item.xpath("./div[@id=‘goods_detail_b‘]/div[2]/p[1]/text()").extract() # 销售数量
        sales_num= ‘‘.join(sales_num)
        sales_num = sales_num.split(‘销量:‘)[1]
        sales_num = int(sales_num)
        flag_num = int(sales_num)

        data = item.xpath("./div[@class=‘pro_pic_box‘]/a").extract()
        data = re.search(‘".*d"‘,data[0])
        data = data.group().replace("&amp;","&")
        data = data.replace(""","")
        data_url = parse.urljoin(domain,data) # 链接为销量排序之后的单个商品链接,传出链接
        #print("开始获取商品:{}".format(data_url))

        if sales_num > 0:
            r.lpush(‘91jiafan:catalogue_url‘,data_url) # 此处存储商品的url,判断条件为销售数量大于0

    #此处代码用来切到下一页链接数据,商品的详情排布页
    next_page = sel.xpath("//*[@class=‘pagination2‘]/a[@href]").extract()
    if len(next_page) > 2 and flag_num > 0:
        url_next = re.search(‘".*d"‘,next_page[-1])
        url_next = url_next.group().replace("&amp;","&") # 此处&由于被转义成&amp;导致需要重新进行处理
        url_next = url_next.replace(""","") 
        url_next = parse.urljoin(domain,url_next)
        parse_data_last(url_next)  


#获取商品描述
def parse_product_attributes(url):
    #获取商品的详情以及销售数量
    product_id = url.split(‘id=‘)[1] # 对商品id进行切片处理,用来获取ajax数据
    res_text = requests.get(url).text
    sel = Selector(text=res_text)
    #筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
    #这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
    Is_price = sel.xpath("//input[contains(@id,‘is_price‘)]").extract()#取到的数据用来判断价格是否需要咨询商家
    if len(Is_price) < 1:
        print("页面数据为空")
    else:    
        is_value = re.search(‘d‘,Is_price[0])
        if is_value.group() == ‘0‘: # 0表示商品价格不需要咨询商户
            datas = sel.xpath("//div[contains(@class,‘show_all‘)]/table[1]//tr")
            price_base = 0.0
            if datas:
                #price_base 商品基准价格
                for item in range(len(datas)):
                    price = datas[item].xpath("./input[last()-1]").extract()
                    price = re.search(‘value=".*"‘,price[0])
                    price = re.search(‘d.*d‘,price[0])
                    price = price.group()
                    price_base = price_base + float(price)
                price_base = price_base  / len(datas) # 商品基准价格计算
            else:
                price_base = sel.xpath("//span[@class=‘price_num fl‘]/text()").extract()[1]
                price_base = price_base.replace(" ","")
                price_base = float(price_base)
                #print(type(price_base))
                #print(price_base)
            #此处获取商品的描述信息
            attributes_list = sel.xpath("//span[contains(@class,‘attributes-list‘)]//li/text()").extract()
            str_attributes = ‘ ‘.join(attributes_list)
            str_attributes = str_attributes.replace("&nbsp;"," ") # 商品信息描述
            #此处发送请求获取商品购买数据
            url_sales = parse.urljoin(domain,‘default.php?act=evallist‘)
            data = {
                ‘id‘: product_id,
                ‘page‘: ‘0‘,
                ‘info_type‘: ‘sale‘
            }
            response = requests.post(url_sales, data=data)
            buyer_num = response.json().get("member") # 购买人数
            sale_num = response.json().get(‘num‘) # 销售数量
            buyer_rate = response.json().get(‘re_buyer_rate‘) # 商品复购率
            product_id = int(product_id) # 此处对商品ID进行转换

            product_attributes = Product_attributes()
            product_attributes.product_id = product_id
            product_attributes.price_base = price_base
            product_attributes.attributes = str_attributes
            product_attributes.buyer_num = buyer_num
            product_attributes.sale_num = sale_num
            product_attributes.buyer_rate = buyer_rate
            product_attributes.create_time = datetime.now()
            
            data_0 = Spider_91JIAFAN() # 创建对象
            data_attributes = [product_attributes]
            data_0.up_product_attributes_to_odps(data_attributes)

        else :
            price = "价格请咨询商家"

#获取商户详细数据,处理逻辑为根据单个商品目录来获取对应的商户id
def parse_store_id(url):
    #print(url) # 打印当前商品页的url用来定位
    res_text = requests.get(url).text
    sel = Selector(text=res_text)
    store_id = 0
    #筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
    #这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
    Is_price = sel.xpath("//input[contains(@id,‘is_price‘)]").extract()#取到的数据用来判断价格是否需要咨询商家
    if len(Is_price) < 1:
        print("页面数据为空")
    else:    
        is_value = re.search(‘d‘,Is_price[0])
        if is_value.group() == ‘0‘: # 0表示商品价格不需要咨询商户
            store_id = sel.xpath(‘//span[@class="container_title_span"]/a[@href]‘).extract()
            store_id = ‘‘.join(store_id)
            store_id = re.search(‘storeid=d*"‘,store_id)
            store_id = store_id.group()
            store_id = store_id.split(‘storeid=‘)[1]
            store_id = store_id.replace(""","")
            store_id = int(store_id) # 商户的id
        else :
            pass
    return store_id

#根据store_id拼接的url用来抓取商户的数据
def parse_store_data(url):
    res_text = requests.get(url).text
    sel = Selector(text=res_text)
    if len(res_text) > 10:
        store_name = sel.xpath(‘//span[contains(@class,"container_title_span")]/a[@href]/text()‘).extract()
        store_name = ‘‘.join(store_name) # 商户的名字
        
        store_id = sel.xpath(‘//span[@class="container_title_span"]/a[@href]‘).extract()
        store_id = ‘‘.join(store_id)
        store_id = re.search(‘storeid=d*"‘,store_id)
        store_id = store_id.group()
        store_id = store_id.split(‘storeid=‘)[1]
        store_id = store_id.replace(""","")
        store_id = int(store_id) # 商户的id

        store_level = ‘‘
        store_place = ‘‘
        store_describe = ‘‘
        store_supply = ‘‘
        store_service = ‘‘

        store_data = sel.xpath(‘//ul[contains(@class,"gy_info_list")]/li/text()‘).extract()
        if len(store_data) > 3:
            store_level = store_data[2] # 商户等级
            store_level = store_level.replace(" ","")
            store_level = store_level.replace(" ","")
            store_place = store_data[3] # 商户地址
            store_place = store_place.replace(" ","")

        store_aptitude = stor_url_aptitude + str(store_id) # 商户的资质

        temp_datas = sel.xpath(‘//li[contains(@class,"evaluate")]//div[@style]//text()‘).extract()
        if len(temp_datas) == 6:
            store_describe = temp_datas[0] + ‘:‘ + temp_datas[1] # 商户描述
            store_supply = temp_datas[2] + ‘:‘ + temp_datas[3] # 商户供货
            store_service = temp_datas[4] + ‘:‘ + temp_datas[5] # 商户服务
        
        store = Store()
        store.store_id = store_id
        store.store_name = store_name
        store.store_level = store_level
        store.store_place = store_place
        store.store_aptitude = store_aptitude
        store.store_describe = store_describe
        store.store_supply = store_supply
        store.store_service = store_service
        store.create_time = datetime.now()

        data_0 = Spider_91JIAFAN() # 创建对象
        data_store = [store]
        data_0.up_store_to_odps(data_store)

class ParseproductThread(Thread):
    def run(self):
        while(1):
            try:
                data = r.lpop(‘91jiafan:catalogue_url‘)
                #print("开始处理商品:{}".format(data))
                parse_product_attributes(data)
                store_id = parse_store_id(data)
                store_id_url = store_url_domain + str(store_id)
                r.lpush(‘91jiafan:store_id_url‘,store_id_url)
            except:
                time.sleep(120)
                print("data is null")

class Parse_storedata_Thread(Thread):
    def run(self):
        while(1):
            try:
                data = r.lpop(‘91jiafan:store_id_url‘)
                #print("开始处理商户:{}".format(data))
                parse_store_data(data)
            except:
                time.sleep(120)
                print("data is null")


class parse_91_url_Thread(Thread):
    def run(self):
        #获取最终需要抓取的url
        url_list = get_catalogue_url()
        for url in url_list:
            parse_data_last(url)

class parse_91_productdata_Thread(Thread):     
    def run(self):
        #提取商品列表页的数据
        url_list = get_catalogue_url()
        for url in url_list:
            parse_product_data(url)

class Spider_91JIAFAN():
    def __init__(self):
        self.o = ODPS(
            access_id=ODPS_ID,
            secret_access_key=ODPS_KEY,
            project=ODPS_PROJECT,
            endpoint=ODPS_ENDPOINT
            )
        self.table_name_0 = ‘91_product‘
        self.t0 = self.o.get_table(self.table_name_0)  # 获取odps_91jiafan商品信息
        self.table_name_1 = ‘91_product_attributes‘
        self.t1 = self.o.get_table(self.table_name_1)  # 获取odps_91jiafan商品描述信息
        self.table_name_2 = ‘91_store‘
        self.t2 = self.o.get_table(self.table_name_2)  # 获取odps_91jiafan商户信息

    #上传商品信息
    def up_product_to_odps(self, goods_list):
        """
        上传商品至ODPS表
        :return:
        """
        part_col = datetime.strftime(datetime.now(), ‘%Y%m%d‘)
        record = []
        for goods in goods_list:
            record.append([
                goods.product_id,
                goods.name,
                goods.price,
                goods.sales_num,
                goods.store_id,
                goods.create_time
            ])
        while True:
            try:
                with self.t0.open_writer(partition=f‘partition_date={part_col}‘, create_partition=True) as writer:
                    writer.write(record)
                    print(f‘{self.table_name_0} - 成功上传{len(record)}条数据‘, str(datetime.now()))
            except Exception as e:
                print(e.args)
                print(f‘{self.table_name_0} - 上传ODPS失败,正在重试...‘)
                continue
            else:
                break

    #上传商品描述信息            
    def up_product_attributes_to_odps(self, goods_list):
            """
            上传商品至ODPS表
            :return:
            """
            part_col = datetime.strftime(datetime.now(), ‘%Y%m%d‘)
            record = []
            for goods in goods_list:
                record.append([
                    goods.product_id,
                    goods.price_base,
                    goods.attributes,
                    goods.buyer_num,
                    goods.sale_num,
                    goods.buyer_rate,
                    goods.create_time
                ])
            while True:
                try:
                    with self.t1.open_writer(partition=f‘partition_date={part_col}‘, create_partition=True) as writer:
                        writer.write(record)
                        print(f‘{self.table_name_1} - 成功上传{len(record)}条数据‘, str(datetime.now()))
                except Exception as e:
                    print(e.args)
                    print(f‘{self.table_name_1} - 上传ODPS失败,正在重试...‘)
                    continue
                else:
                    break

    #上传商户信息            
    def up_store_to_odps(self, stores_list):
            """
            上传商品至ODPS表
            :return:
            """
            part_col = datetime.strftime(datetime.now(), ‘%Y%m%d‘)
            record = []
            for stores in stores_list:
                record.append([
                    stores.store_id,
                    stores.store_name,
                    stores.store_level,
                    stores.store_place,
                    stores.store_aptitude,
                    stores.store_describe,
                    stores.store_supply,
                    stores.store_service,
                    stores.create_time
                ])
            while True:
                try:
                    with self.t2.open_writer(partition=f‘partition_date={part_col}‘, create_partition=True) as writer:
                        writer.write(record)
                        print(f‘{self.table_name_2} - 成功上传{len(record)}条数据‘, str(datetime.now()))
                except Exception as e:
                    print(e.args)
                    print(f‘{self.table_name_2} - 上传ODPS失败,正在重试...‘)
                    continue
                else:
                    break

if __name__ == "__main__":
    #start_time = datetime.now()
    process_nodes_list(domain)
    parse_91_url_thread = parse_91_url_Thread()
    parse_91_productdata_thread = parse_91_productdata_Thread()
    
    parse_91_url_thread.start()
    parse_91_productdata_thread.start()
    
    
    for i in range(10):
        parse_product_thread = ParseproductThread()
        parse_product_thread.start()

    
    for i in range(8):
        parse_storedata_thread = Parse_storedata_Thread()
        parse_storedata_thread.start()
    
    #end_time = datetime.now()
    #print("一共使用时间:",end_time - start_time)
    

以上是关于91家纺网本地版本定稿,thread版本的主要内容,如果未能解决你的问题,请参考以下文章

91家纺网,加上redis数据联通测试基础测试第一次

91家纺网,十二次更新,配套的数据连接,数据库产生两个表格

91家纺网,十三次更新,用于测试数据库连接网页问题,由于会出现错误网站,导致接收到的值为空。

pyenv 安装本地版本

WAMP本地环境升级php版本--第二次尝试

vagrant 本地添加box 支持带版本号