ruby 轻量级并行web图形爬虫

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ruby 轻量级并行web图形爬虫相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env ruby

#
# crawler.rb --- Lightweight parallel web graph crawler
#
# Usage:
#   ./crawler.rb START_URL TARGET_REGEXP
#
# Output:
#   stdout --- edge list (tab separated URLs)
#   stderr --- log
#
# Example:
#   ./crawler.rb http://is.s.u-tokyo.ac.jp is.s.u-tokyo.ac.jp >graph.tsv 2>log.txt
#
# Author:
#   Takuya Akiba (@iwiwi)
#   http://www-imai.is.s.u-tokyo.ac.jp/~takiba/index_e.html
#

require 'rubygems'
require 'mechanize'
require 'uri'
require 'thread'
require 'set'
require 'logger'
require 'digest/md5'
require 'json'

NUM_THREADS = 50

WAIT_FETCH = 1
WAIT_THREADS = NUM_THREADS / 10.0
WAIT_STAT = 10
TIMEOUT_FETCH = 5
MAX_URI_LENGTH = 256
MAX_URI_PER_HOST = 30000

@log = Logger.new(STDERR)
# @log.level = Logger::WARN

$host = Struct.new("Host", :addr, :mutex, :queue, :status, :visited_urls, :visited_md5s)
$hosts = Hash.new {|h, k| h[k] = $host.new(k, Mutex::new, [], :halted, Set.new, Set.new) }
$hosts_mutex = Mutex.new

$stat_num_pages = $stat_num_links = 0
$stat_mutex = Mutex.new

# The order of mutexes to hold:
#   $hosts_mutex -> $hosts[_].mutex -> $stat_mutex

def output_edge(u1, u2)
  u1 = u1.to_s.gsub("\t", " ")
  u2 = u2.to_s.gsub("\t", " ")
  print(u1 + "\t" + u2 + "\n")
  $stat_mutex.synchronize do
    $stat_num_links += 1
  end
end

def enque(uri)
  h = nil
  $hosts_mutex.synchronize do
    h = $hosts[URI.parse(uri.to_s).host]
  end
  h.mutex.synchronize do
    return if h.visited_urls.include?(uri) || h.visited_urls.size >= MAX_URI_PER_HOST
    h.queue.push(uri)
    h.visited_urls.add(uri)
  end
end

def find_waiting_host
  exist_working = false
  $hosts_mutex.synchronize do
    $hosts.each do |a, h|
      h.mutex.synchronize do
        exist_working |= (h.status == :working)
        if !h.queue.empty? && h.status == :halted
          h.status = :working
          return h.addr
        end
      end
    end
  end
  return exist_working ? :wait : :done
end

def crawl(host_addr, thread_id = 0)
  @log.info("[#{thread_id}] BEGIN #{host_addr}")
  h = $hosts[host_addr]

  Mechanize.start do |agent|
    agent.max_history = 1
    agent.robots = true

    loop do
      uri_from = nil
      h.mutex.synchronize do
        if h.queue.empty?
          agent.shutdown
          h.status = :halted
          @log.info("[#{thread_id}] FINISH #{host_addr}")
          return
        else
          uri_from = h.queue.shift
        end
      end

      begin
        sleep(WAIT_FETCH)

        timeout(TIMEOUT_FETCH) do
          @log.debug("[#{thread_id}] ACCESS #{uri_from} (QUEUE: #{h.queue.size})")
          agent.get(uri_from)
          $stat_mutex.synchronize do
            $stat_num_pages += 1
          end

          if uri_from != agent.page.uri.to_s
            @log.debug("[#{thread_id}] JUMP #{uri_from} -> #{agent.page.uri.to_s}")
          end
          next if !agent.page.kind_of?(Mechanize::Page)

          begin
            body = agent.page.body.dup
            body.gsub!(/[0-9\s\/]+/, '')
            body.gsub!(/<[^>]*>/, '')
            [uri_from, agent.page.uri.to_s].uniq.each do |u|
              u.split('/').sort.uniq.each do |w|
                body.gsub!(w, '')
              end
            end

            md5 = Digest::MD5.digest(body)
            if h.visited_md5s.include?(md5)
              @log.info("[#{thread_id}] DUPLICATE #{uri_from}")
              next
            end
            h.visited_md5s.add(md5)
          end

          agent.page.search('a').each do |a|
            next if !a['href']

            uri_to = URI.parse(agent.page.uri.to_s).merge(a['href'])
            next if uri_to.scheme != "http" && uri_to.scheme != "https"

            uri_to = uri_to.to_s
            next if !(@regexp =~ uri_to) || uri_to.include?('?') || uri_to.include?('#')
            next if uri_to.include?('http://web.archive.org/')

            if uri_to.length >= MAX_URI_LENGTH
              @log.info("[#{thread_id}] TOO LONG #{uri_to}")
              next
            end

            enque(uri_to)
            output_edge(uri_from, uri_to)
          end
        end
      rescue Timeout::Error, StandardError, NoMemoryError => e
        @log.info("[#{thread_id}] ERROR #{e.to_s} (#{host_addr} #{uri_from})")
      end
    end
  end
end

if __FILE__ == $0
  if ARGV.length != 2
    $stderr.puts("usage: crawler START_URL TARGET_REGEXP")
    abort
  end

  enque(ARGV[0])
  @regexp = Regexp.new(ARGV[1])

  ts = (1 .. NUM_THREADS).map do |thread_id|
    Thread.new do
      loop do
        r = find_waiting_host
        break if r == :done

        if r == :wait
          sleep(WAIT_THREADS)
        else
          crawl(r, sprintf("%3d", thread_id))
        end
      end
    end
  end

  begin
    prv_ps = prv_ls = 0
    while !ts.empty?
      sleep WAIT_STAT

      p = ($stat_num_pages - prv_ps) / WAIT_STAT.to_f
      l = ($stat_num_links - prv_ls) / WAIT_STAT.to_f
      t = 0
      $hosts_mutex.synchronize do
        t = $hosts.inject(0) do |s, h|
          s + (h[1].status == :working ? 1 : 0)
        end
      end
      @log.info("[---] STAT TOTAL #{$stat_num_pages} pages  , #{$stat_num_links} links")
      @log.info("[---] STAT CURRENT #{p} pages/s, #{l} links/s, #{t} threads working")

      ts.delete_if do |t|
        !t.alive? && t.join
      end
      prv_ps = $stat_num_pages
      prv_ls = $stat_num_links
    end
  end

  $hosts.map{|h, d| [h, d.visited_urls.size]}.sort_by{|h, d| d}.each do |h, d|
    @log.info("[---] DONE #{d} #{h}")
  end
end

以上是关于ruby 轻量级并行web图形爬虫的主要内容,如果未能解决你的问题,请参考以下文章

Ruby 多线程

雷林鹏分享:Ruby 多线程

python轻量级爬虫的编写

大厂在用的Python反爬虫手段,破了它!

大厂在用的反爬虫手段

python爬虫框架 — Scrappy