python 我喜欢这个博客。如果我没有互联网连接,我会读这个,而不是玩扫雷。使用bs4,peewee和请求。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 我喜欢这个博客。如果我没有互联网连接,我会读这个,而不是玩扫雷。使用bs4,peewee和请求。相关的知识,希望对你有一定的参考价值。

import datetime

from peewee import *

db = SqliteDatabase('matrix67.sqlite')

class ModelBase(Model):
    class Meta:
        database = db

ImageProxy = Proxy()

class Post(ModelBase):
    id = IntegerField(primary_key=True)
    title = TextField()
    content = TextField()
    created = DateTimeField()
    image = ForeignKeyField(ImageProxy, null=True)
    preview = TextField(null=True)
    last_seek = DateTimeField(default=datetime.datetime.now)

class Comment(ModelBase):
    id = IntegerField(primary_key=True)
    post = ForeignKeyField(Post)
    name = TextField()
    avatar = TextField(null=True)
    content = TextField(null=True)
    author_response = TextField(null=True)
    web = TextField(null=True)
    created = DateTimeField(null=True)

class Image(ModelBase):
    id = PrimaryKeyField()
    original_url = TextField()
    post = ForeignKeyField(Post)
    content = BlobField(null=True)

ImageProxy.initialize(Image)

class Tag(ModelBase):
    id = PrimaryKeyField()
    post = ForeignKeyField(Post)
    tag = CharField()
    original_id = CharField(null=True)

def init():
    with db.atomic():
        db.create_tables([
            Post,
            Comment,
            Image,
            Tag
        ], safe=True)
from bs4 import BeautifulSoup
import requests
import re

import models as m

root_url = 'http://www.matrix67.com/'
blog_url = root_url + 'blog/'
image_url = root_url + 'blogimage/'
page_template = blog_url + 'page/{}'
post_template = blog_url + 'archives/{}'

s = requests.Session()

rexPostId = re.compile(r'post-(\d+)')
rexCommentId = re.compile(r'comment-(\d+)')
rexTagId = re.compile(r'tag-(\S+)')
rexGavatarHash = re.compile(r'\/avatar\/(.{32})')

def bs(text):
    return BeautifulSoup(text, 'html.parser')

def processTags(postId, tagIdList, tagNameList):
    tagDict = [ {
        'post': postId,
        'tag': tagName,
        'original_id': tagIdList[i]
    }  for (i, tagName) in enumerate(tagNameList) ]
    m.Tag.insert_many(tagDict).execute()

def processImage(postId, imageTag):
    # todo: download image
    src = imageTag['src']
    req = s.get(src, stream=True)
    if req.status_code == 200:
        content = req.raw.data
    else:
        content = None

    return {
        'original_url': src,
        'post': postId,
        'content': content
    };

def processComment(postId, commentTag):
    #todo: n-level comment
    id = rexCommentId.search(commentTag['id']).group(1)
    cite = commentTag.select('cite.fn')[0]
    content = commentTag.find('p')
    contentText = content.renderContents()

    responseText = None
    contentNext = commentTag.select('p > span')
    if contentNext:
        responseText = contentNext[0].renderContents()

    web = None
    name_link = cite.find('a')
    if name_link:
        web = name_link['href']
        name = name_link.string
    else:
        name = cite.string

    try:
        # due to incorrect markup, this might fail mystriously
        meta = commentTag.select('div.comment-meta')[0]
        created_tmp = re.sub(r'(\D)(\d)(?!\d)', r'\g<1>0\2', meta.find('a').string)
        created = re.sub(r'(....)年(..)月(..)日 (..:..)', r'\1-\2-\3T\4:00Z', created_tmp)
    except:
        created = None
        # f = open('www.txt', 'wb')
        # f.write(bytes(commentTag.prettify(), 'utf8'))
        # f.close()

    avatarMatch = rexGavatarHash.match(commentTag.select('img')[0]['src'])
    if avatarMatch:
        avatarHash = avatarMatch.group(1)
    else:
        avatarHash = None

    return {
        'id': id,
        'post': postId,
        'name': name,
        'avatar': avatarHash,
        'content': contentText,
        'author_response': responseText,
        'web': web,
        'created': created
    }

def convertContent(content):
    # todo: parse all subnodes and turn into markdown format.
    return content.renderContents()

def crawlEntry(postId):
    resp = s.get(post_template.format(postId))
    doc = bs(resp.text)

    post = doc.select('#main article')[0]
    classes = post['class']
    tag_id_list = []
    for cls in classes:
        tag_id_match = rexTagId.search(cls)
        if tag_id_match:
            tag_id_list.append(tag_id_match.group(1))


    content = post.select('div.entry-content')[0]
    titleText = post.select('header.entry-header h1.entry-title')[0].string

    meta = doc.select('div.entry-meta')[0]
    created = meta.select('time.entry-date')[0]['datetime'].replace('+00:00', 'Z')
    images = post.select('img')
    imagePreview = None

    with m.db.atomic():
        for img in images:
            if img['src'].find(image_url) == 0:
                imgData = processImage(postId, img)
                imgModel, _ = m.Image.create_or_get(**imgData)
                # todo: replace the src of image wisely
                if imagePreview is None:
                    imagePreview = imgModel
        print('  IMAGE {:>2}'.format(len(images)), end='')

        tags = meta.select('a[rel="tag"]')
        processTags(postId, tag_id_list, [ tag.string for tag in tags ])
        print('  TAG {:>2}'.format(len(tags)), end='')

        comments = doc.select('#comments ul.comment-list > li')
        for comment in comments:
            commentData = processComment(postId, comment)
            m.Comment.create_or_get(**commentData)
        print('  COMMENT {:>3}'.format(len(comments)), end='')
        print()

    # in case that
    contentText = convertContent(content)

    return {
        'id': postId,
        'title': titleText,
        'content': contentText,
        'created': created,
        'image': imagePreview,
        'preview': None
    }

def main():
    print('Initiating database...')
    m.init()
    page = 173

    # with m.db.atomic():
    while page > 0:
        print('PAGE #{:>3}...'.format(page))
        resp = s.get(page_template.format(page))

        # todo: handling 404
        page_doc = bs(resp.text)
        # some mysterious markup mess
        # todo: replace wrongly balanced tags: </p></blockquote> -> </blockquote></p>
        entries = page_doc.select('article.post.status-publish')
        entries.reverse()

        post_list = []

        for entry in entries:
            post_id = rexPostId.search(entry['id']).group(1)
            print('  POST #{:>5}... '.format(post_id), end='')
            # prevent duplicating
            if m.Post.select().where(m.Post.id == post_id).first():
                print('  EXISTS')
                continue;
            entryDict = crawlEntry(post_id)
            post_list.append(entryDict)

        # or it will raise sqlite3 syntax error
        if post_list:
            m.Post.insert_many(post_list).execute()

        # next iteration!
        page -= 1

if __name__ == '__main__':
    main()

以上是关于python 我喜欢这个博客。如果我没有互联网连接,我会读这个,而不是玩扫雷。使用bs4,peewee和请求。的主要内容,如果未能解决你的问题,请参考以下文章

python 等到连接激活

如果没有互联网连接,如何在一段时间后重试连接不和谐机器人?

Ionic 2 - 如果没有互联网 + 有效表单,我如何禁用按钮

Python爬虫编程思想:实战案例:抓取博客文章列表

如果没有互联网连接,NSJSONSerialization 会终止应用程序

如果 webview 中没有互联网连接,iOS 会显示警报