go并发版爬虫

Posted chenwenyin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go并发版爬虫相关的知识,希望对你有一定的参考价值。

并发版爬虫

代码实现

/crawler/main.go
package main

import (
    "learn/crawler/engine"
    "learn/crawler/scheduler"
    "learn/crawler/zhenai/parser"
)

func main() {
    e := engine.ConcurrentEngine{
        Scheduler: &scheduler.QueuedScheduler{},
        WorkerCount: 20,
    }
    e.Run(engine.Request{
        Url:       "http://www.zhenai.com/zhenghun",
        ParseFunc: parser.ParseCityList,
    })
    //测试上海单个城市
    //e.Run(engine.Request{
    //  Url:       "http://www.zhenai.com/zhenghun/shanghai",
    //  ParseFunc: parser.ParseCity,
    //})
}
/crawler/engine/simple.go
package engine

import (
    "learn/crawler/fetcher"
    "log"
)

type SimpleEngine struct {

}

func (e SimpleEngine) Run(seeds ...Request)  {
    var requests []Request
    for _, r := range seeds {
        requests = append(requests, r)
    }
    for len(requests) > 0 {
        r := requests[0]
        requests = requests[1:]

        parseResult, err := worker(r)
        if err != nil {
            continue
        }
        requests = append(requests, parseResult.Requests...)
        for _, item := range parseResult.Items{
            log.Printf("Got item %v", item)
        }
    }
}
func worker(r Request) (ParseResult, error) {
    log.Printf("Fetching %s", r.Url)
    body, err := fetcher.Fetch(r.Url)
    if err != nil {
        log.Printf("Fetcher: error" + "fetching url %s: %v", r.Url, err)
        return ParseResult{}, err
    }
    return r.ParseFunc(body), nil
}
/crawler/engine/concurrent.go
package engine

import (
    "log"
)

type ConcurrentEngine struct {
    Scheduler Scheduler
    WorkerCount int
}
type Scheduler interface {
    ReadyNotifier
    Submit(Request)
    WorkerChan() chan Request
    Run()
}
type ReadyNotifier interface {
    WorkerReady(chan Request)
}
func (e *ConcurrentEngine) Run(seeds ...Request)  {
    out := make(chan ParseResult)
    e.Scheduler.Run()

    for i := 0; i < e.WorkerCount; i++ {
        createWork(e.Scheduler.WorkerChan(), out, e.Scheduler)
    }
    for _, r := range seeds {
        e.Scheduler.Submit(r)
    }
    itemCount := 0
    for {
        result := <- out
        for _, item := range result.Items {
            log.Printf("Got item #%d: %v", itemCount, item)
            itemCount++
        }
        for _, request := range result.Requests {
            e.Scheduler.Submit(request)
        }
    }
}
func createWork(in chan Request, out chan ParseResult, ready ReadyNotifier)  {
    go func() {
        for  {
            ready.WorkerReady(in)
            request := <- in
            result, err := worker(request)
            if err != nil {
                continue
            }
            out <- result
        }
    }()
}
/crawler/engine/typers.go
package engine

type Request struct {
    Url string
    ParseFunc func([]byte) ParseResult
}
type ParseResult struct {
    Requests []Request
    Items []interface{}
}
func NilParser([]byte) ParseResult{
    return ParseResult{}
}
/crawler/fetcher/fetcher.go
package fetcher

import (
    "bufio"
    "fmt"
    "golang.org/x/net/html/charset"
    "golang.org/x/text/encoding"
    "golang.org/x/text/encoding/unicode"
    "golang.org/x/text/transform"
    "io/ioutil"
    "log"
    "net/http"
    "time"
)

var rateLimiter = time.Tick(100 * time.Millisecond)
func Fetch(url string) ([]byte, error)  {
    <- rateLimiter
    client := &http.Client{}
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return nil, err
    }
    req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36")
    resp, err := client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("Wrong status code: %d", resp.StatusCode)
    }
    bodyReader := bufio.NewReader(resp.Body)
    e := determineEncoding(bodyReader)
    utf8Reader := transform.NewReader(bodyReader, e.NewDecoder())
    return ioutil.ReadAll(utf8Reader)
}
func determineEncoding(r *bufio.Reader) encoding.Encoding  {
    bytes, err := r.Peek(1024)
    if err != nil {
        log.Printf("Fetcher error: %v", err)
        return unicode.UTF8
    }
    e, _, _ := charset.DetermineEncoding(bytes, "")
    return e
}
/crawler/zhenai/parser/citylist.go
package parser

import (
    "learn/crawler/engine"
    "regexp"
)

const cityListRe  = `<a href="(http://www.zhenai.com/zhenghun/[0-9a-z]+)" [^>]*>([^<]+)</a>`
func ParseCityList(contents []byte) engine.ParseResult {
    re := regexp.MustCompile(cityListRe)
    matches := re.FindAllSubmatch(contents, -1)
    result := engine.ParseResult{}
    for _, m := range matches {
        result.Items = append(result.Items, "City: "+string(m[2]))
        result.Requests = append(result.Requests, engine.Request{
            Url:       string(m[1]),
            ParseFunc: ParseCity,
        })
    }
    return result
}
/crawler/zhenai/parser/city.go
package parser

import (
    "learn/crawler/engine"
    "regexp"
)
var (
    profileRe = regexp.MustCompile(`<a href="(http://album.zhenai.com/u/[0-9]+)" [^>]*>([^<]+)</a>`)
    cityUrlRe = regexp.MustCompile(`href="(http://www.zhenai.com/zhenghun/[^"]+)"`)
)
func ParseCity(contents []byte) engine.ParseResult {
    matches := profileRe.FindAllSubmatch(contents, -1)
    result := engine.ParseResult{}
    for _, m := range matches {
        name := string(m[2])
        result.Items = append(result.Items, "User "+name)
        result.Requests = append(result.Requests, engine.Request{
            Url:       string(m[1]),
            ParseFunc: func(c []byte) engine.ParseResult {
                return ParseProfile(c, "name:"+name)
            },
        })
    }
    matches = cityUrlRe.FindAllSubmatch(contents, -1)
    for _, m := range matches {
        result.Requests = append(result.Requests, engine.Request{
            Url:       string(m[1]),
            ParseFunc: ParseCity,
        })
    }
    return result
}
/crawler/zhenai/parser/profile.go
package parser

import (
    "learn/crawler/engine"
    "learn/crawler/model"
    "regexp"
)

const all = `<div class="m-btn purple" data-v-8b1eac0c>([^<]+)</div>`
func ParseProfile(contents []byte, name string) engine.ParseResult {
    profile := model.Profile{}
    profile.User = append(profile.User, name)
    re := regexp.MustCompile(all)
    match := re.FindAllSubmatch(contents,-1)
    if match != nil {
        for _, m := range match {
            profile.User = append(profile.User, string(m[1]))
        }
    }

    result := engine.ParseResult{
        Items: []interface{}{profile},
    }
    return result
}
/crawler/model/profile.go
package model

type Profile struct {
    User []string
}
/crawler/scheduler/queued.go
package scheduler

import "learn/crawler/engine"

type QueuedScheduler struct {
    requestChan chan engine.Request
    workChan chan chan engine.Request
}

func (s *QueuedScheduler) WorkerChan() chan engine.Request {
    return make(chan engine.Request)
}

func (s *QueuedScheduler) Submit(r engine.Request) {
    s.requestChan <- r
}
func (s *QueuedScheduler) WorkerReady(w chan engine.Request){
    s.workChan <- w
}
func (s *QueuedScheduler) Run(){
    s.workChan = make(chan chan engine.Request)
    s.requestChan = make(chan engine.Request)
    go func() {
        var requestQ []engine.Request
        var workerQ []chan engine.Request
        for {
            var activeRequest engine.Request
            var activeWorker chan engine.Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeRequest = requestQ[0]
                activeWorker = workerQ[0]
            }
            select {
                case r := <-s.requestChan:
                    requestQ = append(requestQ, r)
                case w := <-s.workChan:
                    workerQ = append(workerQ, w)
                case activeWorker <- activeRequest:
                    workerQ = workerQ[1:]
                    requestQ = requestQ[1:]
            }
        }
    }()
}

/crawler/scheduler/simple.go
package scheduler

import "learn/crawler/engine"

type SimpleScheduler struct {
    workerChan chan engine.Request
}

func (s *SimpleScheduler) WorkerChan() chan engine.Request {
    return s.workerChan
}

func (s *SimpleScheduler) WorkerReady(chan engine.Request) {
}

func (s *SimpleScheduler) Run() {
    s.workerChan = make(chan engine.Request)
}


func (s *SimpleScheduler) Submit(r engine.Request) {
    go func() { s.workerChan <- r }()
}

完整项目

https://gitee.com/FenYiYuan/golang-cpdcrawler.git

以上是关于go并发版爬虫的主要内容,如果未能解决你的问题,请参考以下文章

Go Web爬虫并发实现

Go语言实战Go语言并发爬虫

用go写爬虫服务并发请求,限制并发数

golang代码片段(摘抄)

go--单任务版爬虫

[Go] 通过 17 个简短代码片段,切底弄懂 channel 基础