Class: AmplitudeAnalytics::ResponseProcessor
- Inherits:
-
Object
- Object
- AmplitudeAnalytics::ResponseProcessor
- Defined in:
- lib/amplitude/processor.rb
Overview
ResponseProcessor
Instance Method Summary collapse
- #callback(events, code, message) ⇒ Object
-
#initialize ⇒ ResponseProcessor
constructor
A new instance of ResponseProcessor.
- #log(events, code, message) ⇒ Object
- #process_response(res, events) ⇒ Object
- #push_to_storage(events, delay, res) ⇒ Object
- #setup(configuration, storage) ⇒ Object
Constructor Details
#initialize ⇒ ResponseProcessor
Returns a new instance of ResponseProcessor.
4 5 6 7 |
# File 'lib/amplitude/processor.rb', line 4 def initialize @configuration = nil @storage = nil end |
Instance Method Details
#callback(events, code, message) ⇒ Object
85 86 87 88 89 90 91 92 |
# File 'lib/amplitude/processor.rb', line 85 def callback(events, code, ) events.each do |event| @configuration.callback.call(event, code, ) if @configuration.callback.respond_to?(:call) event.callback(code, ) rescue StandardError => e @configuration.logger.error("Error callback for event #{event}: #{e.}") end end |
#log(events, code, message) ⇒ Object
94 95 96 97 98 |
# File 'lib/amplitude/processor.rb', line 94 def log(events, code, ) events.each do |event| @configuration.logger.info("#{}, response code: #{code}, event: #{event}") end end |
#process_response(res, events) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/amplitude/processor.rb', line 14 def process_response(res, events) case res.status when HttpStatus::SUCCESS callback(events, res.code, 'Event sent successfully.') log(events, res.code, 'Event sent successfully.') when HttpStatus::TIMEOUT, HttpStatus::FAILED push_to_storage(events, 0, res) when HttpStatus::PAYLOAD_TOO_LARGE if events.length == 1 callback(events, res.code, res.error) log(events, res.code, res.error) else @configuration.increase_flush_divider push_to_storage(events, 0, res) end when HttpStatus::INVALID_REQUEST raise InvalidAPIKeyError, res.error if res.error.start_with?('Invalid API key:') if res.missing_field callback(events, res.code, "Request missing required field #{res.missing_field}") log(events, res.code, "Request missing required field #{res.missing_field}") else invalid_index_set = res.invalid_or_silenced_index events_for_retry = [] events_for_callback = [] events.each_with_index do |event, index| if invalid_index_set.include?(index) events_for_callback << event else events_for_retry << event end end callback(events_for_callback, res.code, res.error) log(events_for_callback, res.code, res.error) push_to_storage(events_for_retry, 0, res) end when HttpStatus::TOO_MANY_REQUESTS events_for_callback = [] events_for_retry_delay = [] events_for_retry = [] events.each_with_index do |event, index| if res.throttled_events&.include?(index) if res.exceed_daily_quota(event) events_for_callback << event else events_for_retry_delay << event end else events_for_retry << event end end callback(events_for_callback, res.code, 'Exceeded daily quota') push_to_storage(events_for_retry_delay, 30_000, res) push_to_storage(events_for_retry, 0, res) else callback(events, res.code, res.error || 'Unknown error') log(events, res.code, res.error || 'Unknown error') end end |
#push_to_storage(events, delay, res) ⇒ Object
74 75 76 77 78 79 80 81 82 83 |
# File 'lib/amplitude/processor.rb', line 74 def push_to_storage(events, delay, res) events.each do |event| event.retry += 1 success, = @storage.push(event, delay) unless success callback([event], res.code, ) log([event], res.code, ) end end end |
#setup(configuration, storage) ⇒ Object
9 10 11 12 |
# File 'lib/amplitude/processor.rb', line 9 def setup(configuration, storage) @configuration = configuration @storage = storage end |