ruby 具有重试逻辑和Manticore客户端集成+ DynamoDB数据类的HTTP请求包装器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ruby 具有重试逻辑和Manticore客户端集成+ DynamoDB数据类的HTTP请求包装器相关的知识,希望对你有一定的参考价值。

# HTTPRequest class is an http client for API calls services 
# see usages bellow

class HTTPRequest
  attr_accessor :client, :options, :exceptions, :api_context, :async

  module ::Manticore
    class ResponseCodeException < ManticoreException; end
  end

  class AsyncProxy
    extend Forwardable

    def_delegators :subject, :options, :set_default_headers, :background

    attr_accessor :subject

    def initialize(subject)
      self.subject = subject
    end

    def get(options={})
      request(:get, options)
    end

    def post(options={})
      request(:post, options)
    end

    def request(method, options)
      url = options.delete(:url)
      oauth_url = options.delete(:oauth_url) || url
      self.options.merge!(options)
      set_default_headers(method, oauth_url)
      background.send(method, url, self.options)
    end
  end

  # API Options :api_context => Instance of API class
  # Method mappings:
  # [ endpoint ] => api_context
  # [ consumer_key, consumer_secret, timeout ] => api_context.api_config
  extend Forwardable

  def_delegator :client, :background
  def_delegator :api_context, :options, :api_options
  def_delegator :api_options, :[], :api_option
  def_delegator :api_config, :[], :api_config_get
  def_delegators :api_context, :endpoint, :api_config

  DEFAULT_RETRIES = 2
  DEFAULT_TIMEOUT = 5
  DEFAULT_EXCEPTIONS = [Manticore::Timeout, Manticore::SocketException, Manticore::ResponseCodeException,
                        Manticore::ClientProtocolException, Manticore::ResolutionFailure]

  # options [ :fails_on_code, :timeout, :retries, :headers, :exceptions ]
  # api_context options: [ :consumer_key, :consumer_secret, :timeout ]
  def initialize(options={})
    self.api_context = options.delete(:api_context)
    raise 'API not found' if self.api_context.blank?

    self.options = options
    self.options.merge!(timeout_options)
    self.options[:headers] = options.fetch(:headers, {})
    self.exceptions = Array(options.delete(:exceptions)) + DEFAULT_EXCEPTIONS
    self.client = Manticore::Client.new(options)
    self.async = AsyncProxy.new(self)
  end

  def get(options={})
    request(:get, options)
  end

  def post(options={})
    request(:post, options)
  end

  private

  def set_default_headers(method, url)
    options[:headers]['Accept'] = 'application/json'
    options[:headers]['Content-Type'] = 'application/json'

    if oauth_options?
      authorization = SimpleOAuth::Header.new(method, url, {}, oauth_options).to_s
      options[:headers]['Authorization'] = authorization
    end
  end

  def oauth_options?
    api_config_get(:codebig_key).present? &&
    api_config_get(:codebig_secret).present?
  end

  def oauth_options
    { consumer_key: api_config_get(:codebig_key),
      consumer_secret: api_config_get(:codebig_secret) }
  end

  def fails_on_code?(code)
    if code != 404 && code >= 400 && options.fetch(:fails_on_code, true)
      raise Manticore::ResponseCodeException, code
    end
  end

  def request(method, options)
    url = options.delete(:url) || endpoint
    retries = options.delete(:retries) || options.fetch(:retries, DEFAULT_RETRIES)
    self.options.merge!(options)
    set_default_headers(method, url)
    retried = 0
    begin
      response = client.send(method, url, self.options)
      fails_on_code?(response.code)
      JSON.parse(response.body) rescue {}
    rescue exception_matcher => e
      if retried < retries
        retried += 1
        if api_option(:logger).present?
          params = { retries: retried, trace: e.inspect, action: method }
          api_option(:logger).call(params)
        end
        sleep(rand(retried * 100) * 0.01)
        retry
      end
      raise
    end
  end

  def timeout_options
    t = self.options.delete(:timeout) || api_config_get(:timeout) || DEFAULT_TIMEOUT
    { request_timeout: t, connect_timeout: t, socket_timeout: t }
  end

  def exception_matcher
    exc = self.exceptions
    matcher = Module.new
    (class << matcher; self; end).class_eval do
      define_method(:===) do |error|
        exc.any? { |e| error.is_a?(e) }
      end
    end
    matcher
  end
end

# Usage for HTTPRequest
class XapiRequest
  # Type of responses
  #  {"code":101, "message": "partner token expired"}
  #  {"code"=>0, "message"=>"OK"} alert, launch
  #  {"code"=>400, "message"=>"request parameter 'psid' is missing"}
  #  {"psid"=>"yo5MKBnUK7m8vWrNa%2BgTGo7JEpDRxGYkYdYh04mNcQ4%3D", "expires"=>"1452733646131"}

  # Changes:
  # Game launch => process_deeplink GET  https://xapi-prod.codebig2.net/processdeeplink/@device_id.js
  # Send alert  => send_alert       GET  https://xapi-prod.codebig2.net/sendalert/@device_id.js
  # XAPI login  => login            POST https://xapi-prod.codebig2.net/partnerlogin.js BODY { partnername: password: }
  # New Method # Move screen => advance_screen same as #Game launch => process_deeplink

  attr_accessor :client, :options, :params

  def initialize(params={})
    self.params = params
    self.options = { logger: self.params.delete(:logger),
                     codebig_key: Rails.configuration.xapi.codebig.prod_key,
                     codebig_secret: Rails.configuration.xapi.codebig.prod_secret }
    self.client = HTTPRequest.new(api_context: self)
  end

  def login
    url = xapi_base_url + Rails.configuration.xapi.codebig.prod_login_endpoint
    params = { partnername: Rails.configuration.xapi.partner_name,
               password: Rails.configuration.xapi.partner_password }
    client.post(url: url, params: params, retries: 1)
  end

  def alert(title, message)
    xapi_url = [ xapi_base_url, Rails.configuration.xapi.codebig.prod_send_alert_endpoint, "#{params[:device_id]}.js"].join('')
    url = xapi_url + '?' + send_alert_params(title, message)
    Rails.logger.info("[event=xapi_alert] [event_status=request] [url=#{url}]")
    client.async.get(url: url, retries: 2)
  end
end

# Pulsar data model used to connect to DynamoDB
class Pulsar
  cattr_accessor :client
  self.client = Aws::DynamoDB::Client.new(region: 'No',
                                          credentials: Rails.configuration.api.pulsar.creds,
                                          endpoint: Rails.configuration.api.pulsar.base,
                                          ssl_verify_peer: false)
end

# Pair class example for retrieving data from Pulsar DB
# Usage of Pair class
logger = ->(params) { log_state(:error, event: 'ip_pairing_request', event_status: 'error_trace',
                                      error_message: "Pulsar retry #{params[:retried].ordinalize} #{params[:trace]}",
                                      error_code: 'XG-245',) }
@pair = Pair.where(ip_address: ip_address, logger: logger)
@pair = Pair.find_by_pairing_code(params[:pairing_code], logger: logger)

class Pair < Pulsar
  include ActiveModel::Validations

  class Collection
    class NotValidItem < Exception; end
    extend Forwardable

    delegate %w(map each size detect blank? present?) => :collection
    delegate %w(id app_id device_id comcast_session_id account_number) => :item
    delegate %w(xbo_account_id zip3 ip_lease_expires auth_guid)        => :item
    delegate %w(stb_friendly_name game_id whitelisted hsi timestamp)   => :item

    def initialize(items)
      self.collection = items
    end

    def collection?
      size > 1
    end

    def valid?
      size > 0
    end

    def can_pair_device?
      size == 1
    end

    def item
      raise NotValidItem unless valid?
      collection.first
    end

    def detect_device(id)
      detect { |device| device.device_id == id }
    end

    def as_json(options={})
      super(options)['collection']
    end

    private

    attr_accessor :collection
  end

  TABLE_NAME = 'GamesAutoPair'
  TIMEOUT_TIME = 2
  DEFAULT_RETRIES = 1
  EXCEPTIONS_LIST = [Errno::ECONNREFUSED, Timeout::Error]

  attr_accessor :id, :app_id, :device_id, :comcast_session_id, :account_number
  attr_accessor :xbo_account_id, :zip3, :ip_lease_expires, :auth_guid
  attr_accessor :stb_friendly_name, :game_id, :whitelisted, :hsi, :timestamp
  cattr_accessor :conditions, instance_writer: false

  alias_method :can_pair_device?, :valid?

  self.conditions = {}

  validates :id, :app_id, :device_id, :comcast_session_id, :account_number, presence: true
  validates :xbo_account_id, :zip3, :ip_lease_expires, :auth_guid, :stb_friendly_name, :timestamp, presence: true
  validate :expiration_time

  def initialize(attributes = {})
    self.id                 = attributes['id']
    self.device_id          = attributes['deviceId']
    self.app_id             = attributes['appId']
    self.comcast_session_id = attributes['comcastSessionId']
    self.account_number     = attributes['billingAccountId']
    self.auth_guid          = attributes['cstAuthGuid']
    self.xbo_account_id     = attributes['xboAccountId']
    self.zip3               = attributes['zip3']
    self.stb_friendly_name  = attributes['stbFriendlyName'] || 'X1 Device'
    self.ip_lease_expires   = attributes['ipLeaseExpires']
    self.game_id            = attributes['gameId']
    self.whitelisted        = attributes['whitelisted']
    self.hsi                = attributes['hsi']
    self.timestamp          = attributes['timestamp']
  end

  def delete
    client.delete_item(table_name: TABLE_NAME, key: { 'id' => id })
  end

  def expired?
    return true if Time.at(timestamp.to_i / 1000) < 5.minutes.ago
    return true if conditions.key?(:ip_address) && Time.at(ip_lease_expires.to_i / 1000) < Time.now
  end

  def collection?
    false
  end

  def as_json(options={})
    super(options.merge(only: %w(device_id stb_friendly_name), include_root_in_json: false))
  end

  private

  def expiration_time
    errors.add(:base, 'record is expired') if expired?
  end

  # Class methods
  class << self
    def where(conditions={})
      self.conditions = conditions
      items = find(conditions[:ip_address])
      if items.present?
        id = items.delete('id')
        items = items.values.map do |attributes|
          new(attributes.merge('id' => id))
        end.select(&:valid?)
      end

      Collection.new(Array(items))
    end

    def find_by_pairing_code(code, conditions={})
      self.conditions = conditions.merge(pairing_code: code)
      item = find(code)

      if item.present?
        attributes = item.values.first.merge(item.slice('id'))
        new(attributes)
      end
    end

    def all(eager=false)
      records = []
      result = client.scan(table_name: TABLE_NAME)
      items = result.items

      if eager # by default Pulsar returns 16 items
        while result.last_evaluated_key.present? do
          result = client.scan(table_name: TABLE_NAME, exclusive_start_key: { id: result.last_evaluated_key['id'] })
          items += result.items
        end
      end

      if items.present?
        items.map do |item|
          id = item['id']
          data = JSON.parse(item['data'])
          data.values.map do |attrs|
            records << new(attrs.merge('id' => id))
          end
        end
      end

      records
    end


    private

    def find(id)
      retried = 0

      item = begin
        Timeout::timeout(TIMEOUT_TIME) do
          client.get_item(table_name: TABLE_NAME, key: { 'id' => id }).item
        end
      rescue exception_matcher => e
        if retried < DEFAULT_RETRIES
          retried += 1
          if logger = self.conditions[:logger]
            logger.call(retried: retried, trace: e.inspect)
          end
          sleep(rand(retried * 100) * 0.01)
          retry
        end
        raise
      end

      if item.present?
        begin
          JSON.parse(item['data']).merge(item.slice('id'))
        rescue JSON::ParserError => e
          Rails.logger.error("[event=pulsar_data_parse] [event_status=error] [error=#{e.inspect}] [attributes=#{item}]")
          {}
        end
      end
    end

    def exception_matcher
      matcher = Module.new
      (class << matcher; self; end).class_eval do
        define_method(:===) do |error|
          EXCEPTIONS_LIST.any? { |e| error.is_a?(e) }
        end
      end
      matcher
    end
  end
end

以上是关于ruby 具有重试逻辑和Manticore客户端集成+ DynamoDB数据类的HTTP请求包装器的主要内容,如果未能解决你的问题,请参考以下文章

Manticore search加一个中文分词

链接取消令牌

ruby 在Resque 1.x中有选择地删除/重试失败的作业

ruby 在Resque 1.x中有选择地删除/重试失败的作业

Polly断路器能否具有指数durationOfBreak?

AWS 中的错误重试和指数退避