ruby 基于EM的爬虫

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ruby 基于EM的爬虫相关的知识,希望对你有一定的参考价值。

require 'simple_worker'
require 'eventmachine'
require 'em-http-request'
require 'nokogiri'
require 'aws'
require 'redis'

class RedEx < SimpleWorker::Base

  merge_gem 'em-redis'
  merge_gem 'happening'
  merge_gem 'utf8_utils'
  merge_gem 'aws'
  merge_gem 'crewait'


  unmerge '../models/domain.rb'
  unmerge '../models/project.rb'
  unmerge '../models/company.rb'
  unmerge '../models/plan.rb'
  unmerge '../models/link.rb'
  unmerge '../models/page.rb'
  unmerge '../models/page_link.rb'

  merge "../../lib/query_builder.rb"
  merge "../../lib/google_base_em.rb"
  merge "../../lib/google_em.rb"
  merge "../../lib/indexation_stats_mixin.rb"
  merge "../../lib/yahoo_stats_mixin.rb"
  merge "../../lib/bing_stats_mixin.rb"
  merge "../../lib/semrush_stats_mixin.rb"
  merge "../../lib/social_stats_mixin.rb"
  merge "../../lib/nokogiri_doc_parser_mixin.rb"
  merge "../../lib/url_validator.rb"
  merge "../../lib/url_cleaner.rb"
  merge "../../lib/crawler_helper/db.rb"
  merge "../../lib/crawler_helper/link.rb"
  merge "../../lib/cloud_crawler_helper/page_processor.rb"
  merge "../../lib/cloud_crawler_helper/found_link_adder.rb"
  merge "../../lib/cloud_crawler_helper/page_stat2.rb"
  merge "../../lib/cloud_crawler_helper/crawler_red_ex_storage_mixin.rb"
  merge "../../lib/cloud_crawler_helper/red_work_queuer.rb"
  merge "../../lib/market_fu/calculator.rb"
  merge "../../lib/cloud_link_helper/redis_mixin.rb"
  merge "../../lib/cloud_crawler_helper/red_ex_page.rb"
  merge "../models/cloud_crawler_found_link.rb"
  merge "../models/cloud_crawler.rb"
  merge "../models/cloud_domain.rb"
  merge "../models/cloud_crawler_url.rb"
  merge "../models/cloud_page.rb"
  merge "redex_page_processor.rb"
  merge '../models/clean/domain.rb'
  merge '../models/clean/project.rb'
  merge '../models/clean/company.rb'
  merge '../models/clean/plan.rb'

  S3_ACCESS_KEY = 'TUSA'
  S3_SECRET_KEY = 'y/XhpORF1vqrxOecHj'
  DO_NOT_CRAWL_TYPES = %w(.pdf .doc .xls .ppt .mp3 .mp4 .m4v .avi .mpg .rss .xml .json .txt .git .zip .md5 .asc .jpg .jpeg .gif .png)
  CONCURRENT_CONNECTIONS = 50
  SIMULTANEOUS_DB_CONNECTIONS = 20
  REDIS_OPTIONS_HASH = {:host => "ikefish.redistogo.com", :port => 9065, :password => "360c4b698d", :thread_safe => true}
  VERBOSE = true

  attr_accessor :domain_id, :a, :r, :visit_key, :queued_key, :starting_url, :base_uri,
    :retrieved, :error_urls, :link_queue, :db_push_queue, :s3_urls, :retries, :domain_id,
    :page_jobs, :link_graph, :found_link_list, :outstanding_jobs, :completed_jobs, :link_storage,
    :path_based_crawl, :crawl_limit, :is_delegated

  def setup_job
    @job_starting_time = Time.now
    SimpleRecord.establish_connection(S3_ACCESS_KEY, S3_SECRET_KEY, :s3_bucket=> :new)

    raise "There is no domain_id supplied.. Aborted !!" if domain_id.blank?
    @tmp_db_push_queue = []
    @status_checking = false
    @is_delegated = is_delegated || false
    @crawl_limit = crawl_limit || 1000
    @domain_id = domain_id
    @domain = CloudDomain.find(@domain_id)
    @domain.crawl_finished = 'false'
    @domain.already_imported = 'false'
    @domain.save

    @crawler = @domain.crawler
    @starting_uri = URI.parse(@domain.crawl_url)
    @base_uri = URI.parse(clean_url(@starting_uri.to_s))
    @starting_url = @base_uri.to_s

    @retrieve_beat = Time.now
    @@heartbeat = Time.now
    @@crawled_page_count = 0
    @@connections = 0
    @@db_connections = 0
    @checkstatus_connections = 0
    @retries = 0
    @s3_urls = []
    @outstanding_jobs = []
    @job_ids = []
    @aggregate_job_processing_duration = 0
    @baseurl_uri = URI.parse(@domain.crawl_url)
    @transfer_status = false
    @delegating_status = false
    @outstanding_jobs_transfer = false
    @bucket = 'domain_storage'

    if @is_delegated.eql?(false)
      log "Resetting queues..."
      reset_redis_queues

      @crawler.set_starting_time
      @crawler.set_processing_status('Initializing')
    end

    true
  end

  def reset_redis_queues
    @crawler.flush_all_redis_information

    return true
  end

  def connecting_database
    log "Connecting to Database" if VERBOSE
    SimpleWorker.configure do |config|
      config.access_key = '6d9aefcf04552c570b239857a56a8cc3'
      config.secret_key = 'b87ef0d1d047fe457c2c6381fd1d174c'

      username = "uf7wrd8yebt4sj"
      password = "p61kv5wfk1trt0vd3w4qfper06"
      host_name = "ec2-174-129-213-125.compute-1.amazonaws.com"
      database = "dm3tjkjv0whfa7j"

      config.database = {
        :adapter => "postgresql",
        :host => host_name, # Not localhost*
        :database => database,
        :username => username,
        :password => password
      }

    end
  end

  def run
    connecting_database

    log "Setting up job..."
    setup_job

    if @starting_url.blank?
      log "Need a starting URL to crawl."
      return false
    end

    if setup_database_queues
      log "Start crawling for real domain ID : #{@domain.domain_id}"
      log "Starting at #{Time.now}"

      setup_delegated_data
      do_process
      looking_up_transfer_status

      log "Ending at #{Time.now}"
      log "Ok."
    else
      log "Error setting up database queues.  Starting URL bad?"
      false
    end
  end

  def do_process
    @starting_time = Time.now
    log "Starting Crawl at #{@starting_time}..."

    do_the_loop

    looking_up_for_delegating

    @ending_time = Time.now
    @total_seconds = @ending_time - @starting_time
    @pph = ((@@crawled_page_count / @total_seconds) * 3600.0).to_i
    log "Ending loop: Total time #{@total_seconds} seconds, total urls #{@@crawled_page_count} (#{@pph} pages/hr)"
    cost_estimate = ((@aggregate_job_processing_duration / 1000.0) / 3600.0) * 0.05
    log "Job Time: #{@aggregate_job_processing_duration}, estimated cost $#{cost_estimate} "
  end

  def looking_up_for_delegating
    log "looking_up_for_delegating"
    if @delegating_status.eql?(true)
      log "\n * Setup delegated data for next job.. "
      @crawler.flush_visit_key
      @crawler.flush_skipped_url
      @crawler.flush_error_url
      @crawler.flush_retries
      @crawler.flush_queued_url
      @crawler.flush_todo
      @crawler.flush_write_out_key
      @crawler.flush_db_push_queue_from_s3
      @crawler.flush_db_push_queue_key_list

      @crawler.set_crawled_count(@@crawled_page_count)
      @todo.size.times.each {|f| @crawler.add_todo(@todo.pop) }
      @visit_key.size.times.each {|f| @crawler.add_visit_key(@visit_key.pop) }
      @skipped_urls.size.times.each {|f| @crawler.add_skipped_url(@skipped_urls.pop) }
      @queued_key.each {|url| @crawler.add_queued_url(url) }
      @error_urls.size.times.each {|f| @crawler.add_error_url(@error_urls.pop) }

      EM.run {
        @db_push_queue.size.times.each do
          #          @db_push_queue.pop {|x| @crawler.add_db_push_queue(x.to_json); }
          @db_push_queue.pop {|x| @crawler.add_db_push_queue_to_s3(x); }
        end

        EM.stop
      }

      redex = @crawler.redex_crawler
      redex.crawl_limit = @crawl_limit
      redex.is_delegated = true
      job_id = redex.queue(:recursive => true)

      @crawler.red_ex_job_id = job_id["task_id"]
      @crawler.save

      log "\n * New Job : #{job_id['task_id']}"
      log "\n * Delegating the process to Job ID : #{@crawler.red_ex_job_id}"
    end
  end

  def setup_delegated_data
    if @is_delegated.eql?(true)
      log "setup_delegated_data"
      log "\n\t * Get delegated data for last job..."

      @crawler.get_visit_key.each {|url| @visit_key << url }
      @crawler.get_skipped_urls.each {|url| @skipped_urls << url }
      @crawler.get_queued_urls.each {|url| @queued_key << url }
      @crawler.get_todo.each {|url| @todo << url }
      @crawler.get_error_urls.each {|url| @error_urls << url }
      @crawler.get_oustanding_jobs.each {|job_id| @outstanding_jobs << {:job_id => job_id, :redis_status => true} }
      @@crawled_page_count = @crawler.get_crawled_count.to_i
      @retries = @crawler.get_retries_count.to_i

      @crawler.get_db_push_key_list.each do |key|
        db_push_queue = @crawler.get_db_push_queue_from_s3(key)
        @tmp_db_push_queue.push(db_push_queue) if db_push_queue.is_a? Hash
        puts @tmp_db_push_queue.size.to_s + "-" + db_push_queue[:url]
      end
    end
  end

  def update_outstanding_jobs
    jobs = @outstanding_jobs.select {|job| job[:redis_status].eql?(false) }

    jobs.each do |job|
      #      @redis.sadd(@crawler.outstanding_jobs_redis_key, job[:job_id])
      begin
        @crawler.add_oustanding_jobs(job[:job_id])
        @crawler.increment_total_jobs
        job[:redis_status] = true
      rescue Timeout::Error => e
        job[:redis_status] = false
        next
      end
    end

    @crawler.set_processing_status('Processing')
    @crawler.set_crawled_count(@@crawled_page_count)
    @crawler.set_retries_count(@retries)
  end

  def looking_up_transfer_status
    if @transfer_status.eql?(true)
      @domain.crawl_finished = "true"
      @domain.save

      @real_domain = Clean::Domain.find @domain.domain_id.to_i
      @real_domain.last_crawl_date = Time.now
      @real_domain.next_crawl_date = @real_domain.project.company.crawl_frequency_range
      @real_domain.next_page_severity_update = Time.now + 1.day rescue nil
      @real_domain.save(:validate => false)

      @crawler.set_finished_time
      @crawler.set_processing_status('Finished')

      log "Domain ID :#{@real_domain.id}"
      log "Last Crawl Date : " + @real_domain.last_crawl_date.to_s
      log "Next Crawl Date : " + @real_domain.next_crawl_date.to_s
    end
  end

  def pushing_db_push_queue_into_em
    log "pushing_db_push_queue_into_em"

    @tmp_db_push_queue.each do |queue|
      @db_push_queue.push(queue)
    end
  end

  def do_the_loop
    @crawler.set_processing_status('Processing')

    EM.run do
      @redis = EM::Protocols::Redis.connect REDIS_OPTIONS_HASH
      @db_push_queue ||= EM::Queue.new
      pushing_db_push_queue_into_em if @is_delegated.eql?(true)
      @@heartbeat = Time.now

      log "\nBeginning RedEx Crawler Processing Loop..."

      EM.add_periodic_timer(60) {
        if (Time.now - @@heartbeat) > 60
          log "Exiting: Heartbeat Not Detected for more than 60 seconds."

          update_outstanding_jobs
          marking_crawler_as_done

          EM.stop
        end

        if (Time.now - @job_starting_time) > 2700
          log "\t Hit 45 minutes.. Delegating data.. "
          @delegating_status = true

          @@connections = 51
          @@db_connections = 21

          update_outstanding_jobs

          EM.stop
        end
      }

      EM.add_periodic_timer(60) {
        if @outstanding_jobs_transfer.eql?(false)
          @outstanding_jobs_transfer = true

          EM.defer(proc {
              update_outstanding_jobs
            }, proc {
              @outstanding_jobs_transfer = false
              log "Outstanding jobs on Redis updated.."
            })
        end
      }

      EM.add_periodic_timer(5) {
        update_logs_with_current_status

        if @@db_connections.to_i < SIMULTANEOUS_DB_CONNECTIONS.to_i and @delegating_status.eql?(false)
          @db_push_queue.pop {|x| write_to_db(x) rescue nil } unless @db_push_queue.blank?
        else
          log "\n\n\n** Redex Thinks that there are either too many simultaneous DB connections or the Delegating Status == false"
          log "DB Connections: #{@@db_connections}, Delegating Status: #{@delegating_status}\n\n\n"
        end

        if completed_retrieval?
          #          log "\n* Completed Retrieval and Page Processing.."
          log "\n* Completed Retrieval.."
          log "\n* Stoping EM.."

          update_outstanding_jobs
          marking_crawler_as_done

          EM.stop
        end
      }

      EM.add_periodic_timer(1) {
        if (Time.now - @retrieve_beat) > 5
          unless @todo.empty? or @@connections > CONCURRENT_CONNECTIONS or @@crawled_page_count > @crawl_limit or @delegating_status.eql?(true)
            retrieve(@todo.pop) if @db_push_queue.size <= 500
          end
        end
      }

      retrieve(@todo.pop) unless @todo.blank?
    end
  end

  def setup_database_queues
    begin
      @queued_key = []
      @todo = Queue.new
      @visit_key = Queue.new
      @skipped_urls = Queue.new
      @error_urls = Queue.new

      if @is_delegated.eql?(false)
        @todo << @starting_url
        @queued_key << @starting_url
      end

      @retrieved = []
      true
    rescue
      false
    end
  end

  def clean_url(found_url)
    begin
      a = URI.parse(found_url)
      a.fragment = nil
      a.path = "/" if a.path.blank?
      return a.to_s
    rescue => e
      log "Error with #{found_url} : #{e.inspect}"
      return false
    end
  end

  def valid_scheme?(uri)
    ["http", "https"].include?(uri.scheme)
  end

  def retrieve(url)
    begin
      @@heartbeat = Time.now

      req = EventMachine::HttpRequest.new(url).get :head => {"Accept" => "text/html", "Accept-Encoding" => "UTF-8"}

      @visit_key << url
      @@crawled_page_count += 1
      @@connections += 1

      req.callback do
        @@connections -= 1
        @@heartbeat = Time.now
        page = RedExPage.new({:url => url, :base_uri => @base_uri, :headers => req.response_header, :code => req.response_header.status, :content => req.response})
        page.callback do |page_hash|
          if [200].include?(page_hash[:code])
            page_hash[:links].each do |link|
              @@heartbeat = Time.now
              setup_new_retrieval
              uri = strip_off_fragment(link) rescue next
              next unless valid_scheme?(uri)
              uri = to_absolute(uri)

              if same_host?(uri) and in_path?(uri)
                unless @queued_key.include?(uri.to_s)
                  link = UrlValidator.new(uri.to_s)
                  filetype = link.filetype.blank? ? '' : link.filetype.downcase

                  if DO_NOT_CRAWL_TYPES.include?(".#{filetype}")
                    @skipped_urls << uri.to_s
                    next
                  end

                  unless @queued_key.length > @crawl_limit
                    @todo.push(uri.to_s)
                    @queued_key << uri.to_s
                  end
                end
              end
            end # page_hash_each
          elsif [301,302,404].include?(page_hash[:code])
          elsif [503].include?(page_hash[:code])
            @retries += 1
            @todo.push(url)
          else
            log "[RedEx] Code type #{page_hash[:code]} not supported."
          end

          if [200,301,302,404,500].include?(page_hash[:code])
            @db_push_queue.push(page_hash)
          end
        end
      end

      req.errback do
        @@heartbeat = Time.now
        @@connections -= 1
        setup_new_retrieval

        if [301,302,404,500].include?(req.response_header.status)
          page = RedExPage.new({:url => url, :base_uri => @base_uri, :headers => req.response_header, :code => req.response_header.status, :content => req.response})

          page.callback do |page_hash|
            @db_push_queue.push(page_hash)
          end
        elsif [503].include?(req.response_header.status)
          @retries += 1
          @todo.push(url)
        else
          @error_urls << url
        end
      end
    rescue => e
      if @@connections.eql?(0)
        log "Parsing error, stopping. URL: #{url}"
        EM.stop
      else
        log "[Error On Retrieve] => #{e.inspect}"
      end
    end
  end

  def check_done
    if @todo.empty? and @@connections == 0
      EM.stop
    end
  end

  def to_absolute(uri)
    uri.relative? ? @base_uri.merge(uri) : uri
  end

  def same_host?(uri)
    @base_uri.host.eql?(uri.host)
  end

  def in_path?(uri)
    uri.path.index(@base_uri.path).eql?(0)
  end

  def do_queuer_loop
    log "do_queuer_loop"
    log "\n\t * Starting SW queuer.."

    @@db_connections = 0

    EM.run do

      EM.add_periodic_timer(60) do
        if (Time.now - @job_starting_time) > 3000
          log "\t Hit 50 minutes.. Delegating data.. "
          @delegating_status = true
          EM.stop
        end
      end

      EM.add_periodic_timer(1) do
        if !@s3_urls.empty?
          available_db_connections = SIMULTANEOUS_DB_CONNECTIONS - @@db_connections

          new_connections = if @s3_urls.size > available_db_connections
            @s3_urls.size
          else
            available_db_connections
          end

          EM::Iterator.new(0..new_connections).each do |num, iter|
            s3_url = @s3_urls.pop
            queue_into_sw!(s3_url) unless s3_url.blank?
            iter.next
          end

        else
          EM.stop
        end
      end

      EM.add_periodic_timer(15) do
        log "S3 URLS : #{@s3_urls.size}, DB Connections : #{@@db_connections}"

        if @s3_urls.empty? and @@db_connections.eql?(0)
          log '* Completed SW queuer..'

          EM.stop
        end
      end
    end
  end

  def queue_into_sw!(s3_url)
    @@heartbeat = Time.now
    EM.defer(proc {
        wq = RedWorkQueuer.new(@crawler.id, s3_url)
        
        wq.callback do |obinfo|
          @@heartbeat = Time.now
        
          if obinfo["task_id"].blank?
            log "Error::Queueing into SW::Task ID is blank: #{obinfo["task_id"]}"
          else
            @outstanding_jobs << {:job_id => obinfo["task_id"], :redis_status => false}
            @job_ids << obinfo["task_id"]
          end
          ret = {:task_id => obinfo["task_id"], :s3_url => s3_url, :ob_info => obinfo}
          ret
        end
        
        wq.errback do
          log "Error::Queuing into SW failed S3URL: #{s3_url}"
        end

      }, proc { |hash_values|
        # log "Queued: #{hash_values.inspect}"
      })

  end

  def write_to_db(page_hash)
    log "Write_To_DB starting" if VERBOSE
    @@heartbeat = Time.now
    @@db_connections += 1

    begin
      pagedigest = Digest::MD5.hexdigest(page_hash[:url])
      url = page_hash[:url] + "_#{pagedigest}"

      begin
        marshal_dump = Marshal.dump(page_hash)
      rescue => e
        @@db_connections -= 1
        log "Error to dump object for URL : #{page_hash[:url]}.. Skip.."
        return true
      end

      on_error = Proc.new do |http|
        @@heartbeat = Time.now
        log "WriteToDb::HappeningWrite::Error::#{http.response_header.status}"
        @error_urls << page_hash[:url]
        @@db_connections -= 1
      end

      s3_url = storage_url(url)
      item = Happening::S3::Item.new(@bucket, s3_url, :aws_access_key_id => S3_ACCESS_KEY, :aws_secret_access_key => S3_SECRET_KEY)

      item.put(marshal_dump, :on_error => on_error) do |resp|
        log "Put #{s3_url} with Happening"
        @@db_connections -= 1
        queue_into_sw!(s3_url) unless s3_url.blank?
      end

    rescue => e

      if e.inspect.include?('Happening::Error')
        @@db_connections -= 1
        log "Error to store with Happening S3 for URL : #{page_hash[:url]}.. Skip.."
        return true
      else
        puts e.inspect
        puts e.backtrace.join("\n") if e.backtrace
      end
    end

    if @@db_connections.to_i < SIMULTANEOUS_DB_CONNECTIONS.to_i and @delegating_status.eql?(false)
      @db_push_queue.pop {|x| write_to_db(x) rescue nil }
    end
  end

  def completed_retrieval?
    if (@@crawled_page_count > @crawl_limit) and @@connections.eql?(0) and @db_push_queue.size.eql?(0) and @@db_connections.eql?(0)
      true
    elsif @todo.empty? and @@connections.eql?(0) and @db_push_queue.size.eql?(0) and @@db_connections.eql?(0)
      true
    else
      false
    end
  end

  def completed_page_processing?
    if @outstanding_jobs.size > 0
      false
    elsif @outstanding_jobs.size == 0 && @completed_jobs.size > 0
      true
    else
      log "Falling through condition on RedEx on completed page processing. error check"
      false
    end
  end

  def marking_crawler_as_done
    #    log "Setting Domain and initiating transfer of the new page data."
    log "Setting Domain."
    @transfer_status = true
  end

  def strip_off_fragment(url)
    uri = URI.parse(url)

    unless uri.fragment.blank?
      non_fragment = uri.to_s.gsub("##{uri.fragment}", '')
      uri = URI.parse(non_fragment)
    end

    uri.path = "/" if uri.path.blank?

    return uri
  end

  def setup_new_retrieval
    unless @todo.empty? or (@@connections > CONCURRENT_CONNECTIONS) or (@@crawled_page_count > @crawl_limit) or @delegating_status.eql?(true)
      if @db_push_queue.size <= 500
        @retrieve_beat = Time.now
        retrieve(@todo.pop)
      end
    end
  end

  def base_key
    current_time = @starting_time
    day = current_time.day
    month = current_time.month
    year = current_time.year
    base_url = @baseurl_uri.to_s.gsub("http://", "")
    base_key = "#{base_url}@-@#{year}-#{month}-#{day}/"
    return base_key
  end

  def storage_url(url)
    "#{base_key}#{CGI.escape(url)}"
  end

  def update_logs_with_current_status
    log "\n\n\n#{Time.now} - Running Time: #{Time.now - @starting_time} seconds\n"
    log "-- # to write to DB: #{@db_push_queue.size}, DB Connections : #{@@db_connections}, Outstanding jobs: #{@outstanding_jobs.size}"
    log "-- Crawled Count: #{@@crawled_page_count}, Visited: #{@visit_key.size}, Touched: #{@queued_key.length}, Todo: #{@todo.length}, Connections: #{@@connections}, Retries: #{@retries}, Error: #{@error_urls.size}\n\n\n"
  end

end

以上是关于ruby 基于EM的爬虫的主要内容,如果未能解决你的问题,请参考以下文章

[Python3网络爬虫开发实战] 1.5.4-RedisDump的安装

WebSocket 与 Ruby 和 EM::WebSocket::Server 握手

ruby em-http-request在重定向上捕获和传递cookie

爬虫的业务监控方案-Flume

Ruby用百度搜索爬虫

ruby 针对复杂网络的twitter爬虫