ruby 具有重试逻辑和Manticore客户端集成+ DynamoDB数据类的HTTP请求包装器
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ruby 具有重试逻辑和Manticore客户端集成+ DynamoDB数据类的HTTP请求包装器相关的知识,希望对你有一定的参考价值。
# HTTPRequest class is an http client for API calls services
# see usages bellow
class HTTPRequest
attr_accessor :client, :options, :exceptions, :api_context, :async
module ::Manticore
class ResponseCodeException < ManticoreException; end
end
class AsyncProxy
extend Forwardable
def_delegators :subject, :options, :set_default_headers, :background
attr_accessor :subject
def initialize(subject)
self.subject = subject
end
def get(options={})
request(:get, options)
end
def post(options={})
request(:post, options)
end
def request(method, options)
url = options.delete(:url)
oauth_url = options.delete(:oauth_url) || url
self.options.merge!(options)
set_default_headers(method, oauth_url)
background.send(method, url, self.options)
end
end
# API Options :api_context => Instance of API class
# Method mappings:
# [ endpoint ] => api_context
# [ consumer_key, consumer_secret, timeout ] => api_context.api_config
extend Forwardable
def_delegator :client, :background
def_delegator :api_context, :options, :api_options
def_delegator :api_options, :[], :api_option
def_delegator :api_config, :[], :api_config_get
def_delegators :api_context, :endpoint, :api_config
DEFAULT_RETRIES = 2
DEFAULT_TIMEOUT = 5
DEFAULT_EXCEPTIONS = [Manticore::Timeout, Manticore::SocketException, Manticore::ResponseCodeException,
Manticore::ClientProtocolException, Manticore::ResolutionFailure]
# options [ :fails_on_code, :timeout, :retries, :headers, :exceptions ]
# api_context options: [ :consumer_key, :consumer_secret, :timeout ]
def initialize(options={})
self.api_context = options.delete(:api_context)
raise 'API not found' if self.api_context.blank?
self.options = options
self.options.merge!(timeout_options)
self.options[:headers] = options.fetch(:headers, {})
self.exceptions = Array(options.delete(:exceptions)) + DEFAULT_EXCEPTIONS
self.client = Manticore::Client.new(options)
self.async = AsyncProxy.new(self)
end
def get(options={})
request(:get, options)
end
def post(options={})
request(:post, options)
end
private
def set_default_headers(method, url)
options[:headers]['Accept'] = 'application/json'
options[:headers]['Content-Type'] = 'application/json'
if oauth_options?
authorization = SimpleOAuth::Header.new(method, url, {}, oauth_options).to_s
options[:headers]['Authorization'] = authorization
end
end
def oauth_options?
api_config_get(:codebig_key).present? &&
api_config_get(:codebig_secret).present?
end
def oauth_options
{ consumer_key: api_config_get(:codebig_key),
consumer_secret: api_config_get(:codebig_secret) }
end
def fails_on_code?(code)
if code != 404 && code >= 400 && options.fetch(:fails_on_code, true)
raise Manticore::ResponseCodeException, code
end
end
def request(method, options)
url = options.delete(:url) || endpoint
retries = options.delete(:retries) || options.fetch(:retries, DEFAULT_RETRIES)
self.options.merge!(options)
set_default_headers(method, url)
retried = 0
begin
response = client.send(method, url, self.options)
fails_on_code?(response.code)
JSON.parse(response.body) rescue {}
rescue exception_matcher => e
if retried < retries
retried += 1
if api_option(:logger).present?
params = { retries: retried, trace: e.inspect, action: method }
api_option(:logger).call(params)
end
sleep(rand(retried * 100) * 0.01)
retry
end
raise
end
end
def timeout_options
t = self.options.delete(:timeout) || api_config_get(:timeout) || DEFAULT_TIMEOUT
{ request_timeout: t, connect_timeout: t, socket_timeout: t }
end
def exception_matcher
exc = self.exceptions
matcher = Module.new
(class << matcher; self; end).class_eval do
define_method(:===) do |error|
exc.any? { |e| error.is_a?(e) }
end
end
matcher
end
end
# Usage for HTTPRequest
class XapiRequest
# Type of responses
# {"code":101, "message": "partner token expired"}
# {"code"=>0, "message"=>"OK"} alert, launch
# {"code"=>400, "message"=>"request parameter 'psid' is missing"}
# {"psid"=>"yo5MKBnUK7m8vWrNa%2BgTGo7JEpDRxGYkYdYh04mNcQ4%3D", "expires"=>"1452733646131"}
# Changes:
# Game launch => process_deeplink GET https://xapi-prod.codebig2.net/processdeeplink/@device_id.js
# Send alert => send_alert GET https://xapi-prod.codebig2.net/sendalert/@device_id.js
# XAPI login => login POST https://xapi-prod.codebig2.net/partnerlogin.js BODY { partnername: password: }
# New Method # Move screen => advance_screen same as #Game launch => process_deeplink
attr_accessor :client, :options, :params
def initialize(params={})
self.params = params
self.options = { logger: self.params.delete(:logger),
codebig_key: Rails.configuration.xapi.codebig.prod_key,
codebig_secret: Rails.configuration.xapi.codebig.prod_secret }
self.client = HTTPRequest.new(api_context: self)
end
def login
url = xapi_base_url + Rails.configuration.xapi.codebig.prod_login_endpoint
params = { partnername: Rails.configuration.xapi.partner_name,
password: Rails.configuration.xapi.partner_password }
client.post(url: url, params: params, retries: 1)
end
def alert(title, message)
xapi_url = [ xapi_base_url, Rails.configuration.xapi.codebig.prod_send_alert_endpoint, "#{params[:device_id]}.js"].join('')
url = xapi_url + '?' + send_alert_params(title, message)
Rails.logger.info("[event=xapi_alert] [event_status=request] [url=#{url}]")
client.async.get(url: url, retries: 2)
end
end
# Pulsar data model used to connect to DynamoDB
class Pulsar
cattr_accessor :client
self.client = Aws::DynamoDB::Client.new(region: 'No',
credentials: Rails.configuration.api.pulsar.creds,
endpoint: Rails.configuration.api.pulsar.base,
ssl_verify_peer: false)
end
# Pair class example for retrieving data from Pulsar DB
# Usage of Pair class
logger = ->(params) { log_state(:error, event: 'ip_pairing_request', event_status: 'error_trace',
error_message: "Pulsar retry #{params[:retried].ordinalize} #{params[:trace]}",
error_code: 'XG-245',) }
@pair = Pair.where(ip_address: ip_address, logger: logger)
@pair = Pair.find_by_pairing_code(params[:pairing_code], logger: logger)
class Pair < Pulsar
include ActiveModel::Validations
class Collection
class NotValidItem < Exception; end
extend Forwardable
delegate %w(map each size detect blank? present?) => :collection
delegate %w(id app_id device_id comcast_session_id account_number) => :item
delegate %w(xbo_account_id zip3 ip_lease_expires auth_guid) => :item
delegate %w(stb_friendly_name game_id whitelisted hsi timestamp) => :item
def initialize(items)
self.collection = items
end
def collection?
size > 1
end
def valid?
size > 0
end
def can_pair_device?
size == 1
end
def item
raise NotValidItem unless valid?
collection.first
end
def detect_device(id)
detect { |device| device.device_id == id }
end
def as_json(options={})
super(options)['collection']
end
private
attr_accessor :collection
end
TABLE_NAME = 'GamesAutoPair'
TIMEOUT_TIME = 2
DEFAULT_RETRIES = 1
EXCEPTIONS_LIST = [Errno::ECONNREFUSED, Timeout::Error]
attr_accessor :id, :app_id, :device_id, :comcast_session_id, :account_number
attr_accessor :xbo_account_id, :zip3, :ip_lease_expires, :auth_guid
attr_accessor :stb_friendly_name, :game_id, :whitelisted, :hsi, :timestamp
cattr_accessor :conditions, instance_writer: false
alias_method :can_pair_device?, :valid?
self.conditions = {}
validates :id, :app_id, :device_id, :comcast_session_id, :account_number, presence: true
validates :xbo_account_id, :zip3, :ip_lease_expires, :auth_guid, :stb_friendly_name, :timestamp, presence: true
validate :expiration_time
def initialize(attributes = {})
self.id = attributes['id']
self.device_id = attributes['deviceId']
self.app_id = attributes['appId']
self.comcast_session_id = attributes['comcastSessionId']
self.account_number = attributes['billingAccountId']
self.auth_guid = attributes['cstAuthGuid']
self.xbo_account_id = attributes['xboAccountId']
self.zip3 = attributes['zip3']
self.stb_friendly_name = attributes['stbFriendlyName'] || 'X1 Device'
self.ip_lease_expires = attributes['ipLeaseExpires']
self.game_id = attributes['gameId']
self.whitelisted = attributes['whitelisted']
self.hsi = attributes['hsi']
self.timestamp = attributes['timestamp']
end
def delete
client.delete_item(table_name: TABLE_NAME, key: { 'id' => id })
end
def expired?
return true if Time.at(timestamp.to_i / 1000) < 5.minutes.ago
return true if conditions.key?(:ip_address) && Time.at(ip_lease_expires.to_i / 1000) < Time.now
end
def collection?
false
end
def as_json(options={})
super(options.merge(only: %w(device_id stb_friendly_name), include_root_in_json: false))
end
private
def expiration_time
errors.add(:base, 'record is expired') if expired?
end
# Class methods
class << self
def where(conditions={})
self.conditions = conditions
items = find(conditions[:ip_address])
if items.present?
id = items.delete('id')
items = items.values.map do |attributes|
new(attributes.merge('id' => id))
end.select(&:valid?)
end
Collection.new(Array(items))
end
def find_by_pairing_code(code, conditions={})
self.conditions = conditions.merge(pairing_code: code)
item = find(code)
if item.present?
attributes = item.values.first.merge(item.slice('id'))
new(attributes)
end
end
def all(eager=false)
records = []
result = client.scan(table_name: TABLE_NAME)
items = result.items
if eager # by default Pulsar returns 16 items
while result.last_evaluated_key.present? do
result = client.scan(table_name: TABLE_NAME, exclusive_start_key: { id: result.last_evaluated_key['id'] })
items += result.items
end
end
if items.present?
items.map do |item|
id = item['id']
data = JSON.parse(item['data'])
data.values.map do |attrs|
records << new(attrs.merge('id' => id))
end
end
end
records
end
private
def find(id)
retried = 0
item = begin
Timeout::timeout(TIMEOUT_TIME) do
client.get_item(table_name: TABLE_NAME, key: { 'id' => id }).item
end
rescue exception_matcher => e
if retried < DEFAULT_RETRIES
retried += 1
if logger = self.conditions[:logger]
logger.call(retried: retried, trace: e.inspect)
end
sleep(rand(retried * 100) * 0.01)
retry
end
raise
end
if item.present?
begin
JSON.parse(item['data']).merge(item.slice('id'))
rescue JSON::ParserError => e
Rails.logger.error("[event=pulsar_data_parse] [event_status=error] [error=#{e.inspect}] [attributes=#{item}]")
{}
end
end
end
def exception_matcher
matcher = Module.new
(class << matcher; self; end).class_eval do
define_method(:===) do |error|
EXCEPTIONS_LIST.any? { |e| error.is_a?(e) }
end
end
matcher
end
end
end
以上是关于ruby 具有重试逻辑和Manticore客户端集成+ DynamoDB数据类的HTTP请求包装器的主要内容,如果未能解决你的问题,请参考以下文章
ruby 在Resque 1.x中有选择地删除/重试失败的作业
ruby 在Resque 1.x中有选择地删除/重试失败的作业