Module: PhobosDBCheckpoint::Handler

Includes:
Phobos::Handler, Phobos::Instrumentation
Included in:
RetryFailure
Defined in:
lib/phobos_db_checkpoint/handler.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



8
9
10
# File 'lib/phobos_db_checkpoint/handler.rb', line 8

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#ack(entity_id, event_time, event_type = nil, event_version = nil) ⇒ Object



12
13
14
# File 'lib/phobos_db_checkpoint/handler.rb', line 12

def ack(entity_id, event_time, event_type = nil, event_version = nil)
  PhobosDBCheckpoint::Ack.new(entity_id, event_time, event_type, event_version)
end

#around_consume(payload, metadata) ⇒ Object

rubocop:disable Style/RedundantBegin



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/phobos_db_checkpoint/handler.rb', line 22

def around_consume(payload, )
  event = PhobosDBCheckpoint::Event.new(
    topic: [:topic],
    group_id: [:group_id],
    payload: payload
  )

   = { checksum: event.checksum }.merge()

  instrument('db_checkpoint.around_consume', ) do
    event_exists = instrument('db_checkpoint.event_already_exists_check', ) { event.exists? }
    if event_exists
      instrument('db_checkpoint.event_already_consumed', )
      return
    end

    event_action = instrument('db_checkpoint.event_action', ) do
      begin
        yield
      rescue StandardError => e
        raise e if retry_consume?(event, , e)

        Failure.record(event: event, event_metadata: , exception: e)
      end
    end

    case event_action
    when PhobosDBCheckpoint::Ack
      instrument('db_checkpoint.event_acknowledged', ) do
        event.acknowledge!(event_action)
      end
    else
      instrument('db_checkpoint.event_skipped', )
    end
  end
ensure
  # Returns any connections in use by the current thread back to the pool, and also returns
  # connections to the pool cached by threads that are no longer alive.
  ActiveRecord::Base.clear_active_connections!
end

#retry_consume?(_event, event_metadata, _exception) ⇒ Boolean

Returns:

  • (Boolean)


16
17
18
19
# File 'lib/phobos_db_checkpoint/handler.rb', line 16

def retry_consume?(_event, , _exception)
  return true unless Phobos.config&.db_checkpoint&.max_retries
  [:retry_count] < Phobos.config&.db_checkpoint&.max_retries
end