Class: Sbmt::Outbox::ProcessItem

Inherits:
DryInteractor show all
Defined in:
app/interactors/sbmt/outbox/process_item.rb

Constant Summary collapse

METRICS_COUNTERS =
i[error_counter retry_counter sent_counter fetch_error_counter discarded_counter].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DryInteractor

call

Instance Attribute Details

#process_latencyObject

Returns the value of attribute process_latency.



21
22
23
# File 'app/interactors/sbmt/outbox/process_item.rb', line 21

def process_latency
  @process_latency
end

#retry_latencyObject

Returns the value of attribute retry_latency.



21
22
23
# File 'app/interactors/sbmt/outbox/process_item.rb', line 21

def retry_latency
  @retry_latency
end

Instance Method Details

#callObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'app/interactors/sbmt/outbox/process_item.rb', line 23

def call
  log_success(
    "Start processing #{box_type} item.\n" \
    "Record: #{item_class.name}##{item_id}"
  )

  item = nil

  item_class.transaction do
    item = yield fetch_item_and_lock_for_update

    cached_item = fetch_redis_item_meta(redis_item_key(item_id))
    if cached_retries_exceeded?(cached_item)
      msg = "max retries exceeded: marking item as failed based on cached data: #{cached_item}"
      item.set_errors_count(cached_item.errors_count)
      track_failed(msg, item)
      next Failure(msg)
    end

    if cached_greater_errors_count?(item, cached_item)
      log_failure("inconsistent item: cached_errors_count:#{cached_item.errors_count} > db_errors_count:#{item.errors_count}: setting errors_count based on cached data:#{cached_item}")
      item.set_errors_count(cached_item.errors_count)
    end

    if item.processed_at?
      self.retry_latency = Time.current - item.created_at
      item.config.retry_strategies.each do |retry_strategy|
        yield check_retry_strategy(item, retry_strategy)
      end
    else
      self.process_latency = Time.current - item.created_at
    end

    middlewares = Middleware::Builder.new(item_process_middlewares)
    payload = yield build_payload(item)
    transports = yield fetch_transports(item)

    middlewares.call(item) do
      transports.each do |transport|
        yield process_item(transport, item, payload)
      end

      track_successed(item)

      Success(item)
    end
  rescue Dry::Monads::Do::Halt => e
    e.result
  rescue => e
    track_failed(e, item)
    Failure(e.message)
  end
ensure
  report_metrics(item)
end