Class: Sbmt::Outbox::ProcessItem
- Inherits:
-
DryInteractor
- Object
- DryInteractor
- Sbmt::Outbox::ProcessItem
- Defined in:
- app/interactors/sbmt/outbox/process_item.rb
Constant Summary collapse
- METRICS_COUNTERS =
i[error_counter retry_counter sent_counter fetch_error_counter discarded_counter].freeze
Instance Attribute Summary collapse
-
#process_latency ⇒ Object
Returns the value of attribute process_latency.
-
#retry_latency ⇒ Object
Returns the value of attribute retry_latency.
Instance Method Summary collapse
Methods inherited from DryInteractor
Instance Attribute Details
#process_latency ⇒ Object
Returns the value of attribute process_latency.
21 22 23 |
# File 'app/interactors/sbmt/outbox/process_item.rb', line 21 def process_latency @process_latency end |
#retry_latency ⇒ Object
Returns the value of attribute retry_latency.
21 22 23 |
# File 'app/interactors/sbmt/outbox/process_item.rb', line 21 def retry_latency @retry_latency end |
Instance Method Details
#call ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'app/interactors/sbmt/outbox/process_item.rb', line 23 def call log_success( "Start processing #{box_type} item.\n" \ "Record: #{item_class.name}##{item_id}" ) item = nil item_class.transaction do item = yield fetch_item_and_lock_for_update cached_item = (redis_item_key(item_id)) if cached_retries_exceeded?(cached_item) msg = "max retries exceeded: marking item as failed based on cached data: #{cached_item}" item.set_errors_count(cached_item.errors_count) track_failed(msg, item) next Failure(msg) end if cached_greater_errors_count?(item, cached_item) log_failure("inconsistent item: cached_errors_count:#{cached_item.errors_count} > db_errors_count:#{item.errors_count}: setting errors_count based on cached data:#{cached_item}") item.set_errors_count(cached_item.errors_count) end if item.processed_at? self.retry_latency = Time.current - item.created_at item.config.retry_strategies.each do |retry_strategy| yield check_retry_strategy(item, retry_strategy) end else self.process_latency = Time.current - item.created_at end middlewares = Middleware::Builder.new(item_process_middlewares) payload = yield build_payload(item) transports = yield fetch_transports(item) middlewares.call(item) do transports.each do |transport| yield process_item(transport, item, payload) end track_successed(item) Success(item) end rescue Dry::Monads::Do::Halt => e e.result rescue => e track_failed(e, item) Failure(e.) end ensure report_metrics(item) end |