Class: Phobos::Actions::ProcessMessage

Inherits:
Object
  • Object
show all
Includes:
Processor
Defined in:
lib/phobos/actions/process_message.rb

Constant Summary

Constants included from Processor

Processor::MAX_SLEEP_INTERVAL

Constants included from Instrumentation

Instrumentation::NAMESPACE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Processor

#snooze

Methods included from Instrumentation

#instrument, subscribe, unsubscribe

Constructor Details

#initialize(listener:, message:, listener_metadata:) ⇒ ProcessMessage

Returns a new instance of ProcessMessage.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/phobos/actions/process_message.rb', line 12

def initialize(listener:, message:, listener_metadata:)
  @listener = listener
  @message = message
  @metadata = .merge(
    key: message.key,
    partition: message.partition,
    offset: message.offset,
    retry_count: 0,
    headers: message.headers
  )
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



10
11
12
# File 'lib/phobos/actions/process_message.rb', line 10

def 
  @metadata
end

Instance Method Details

#executeObject



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/phobos/actions/process_message.rb', line 24

def execute
  payload = force_encoding(@message.value)

  begin
    process_message(payload)
  rescue StandardError => e
    handle_error(e, 'listener.retry_handler_error',
                 "error processing message, waiting #{backoff_interval}s")
    retry
  end
end