Class: Phobos::Actions::ProcessBatchInline

Inherits:
Object
  • Object
show all
Includes:
Processor
Defined in:
lib/phobos/actions/process_batch_inline.rb

Constant Summary

Constants included from Processor

Processor::MAX_SLEEP_INTERVAL

Constants included from Instrumentation

Instrumentation::NAMESPACE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Processor

#snooze

Methods included from Instrumentation

#instrument, subscribe, unsubscribe

Constructor Details

#initialize(listener:, batch:, metadata:) ⇒ ProcessBatchInline

Returns a new instance of ProcessBatchInline.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/phobos/actions/process_batch_inline.rb', line 13

def initialize(listener:, batch:, metadata:)
  @listener = listener
  @batch = batch
  @listener = listener
  @batch = batch
  @metadata = .merge(
    batch_size: batch.messages.count,
    partition: batch.partition,
    offset_lag: batch.offset_lag,
    retry_count: 0
  )
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



11
12
13
# File 'lib/phobos/actions/process_batch_inline.rb', line 11

def 
  @metadata
end

Instance Method Details

#executeObject



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/phobos/actions/process_batch_inline.rb', line 26

def execute
  batch = @batch.messages.map { |message| instantiate_batch_message(message) }

  begin
    process_batch(batch)
  rescue StandardError => e
    handle_error(e, 'listener.retry_handler_error_batch',
                 "error processing inline batch, waiting #{backoff_interval}s")
    retry
  end
end