Class: Phobos::Actions::ProcessBatch

Inherits:
Object
  • Object
show all
Includes:
Instrumentation
Defined in:
lib/phobos/actions/process_batch.rb

Constant Summary

Constants included from Instrumentation

Instrumentation::NAMESPACE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Instrumentation

#instrument, #measure, subscribe, unsubscribe

Constructor Details

#initialize(listener:, batch:, listener_metadata:) ⇒ ProcessBatch

Returns a new instance of ProcessBatch.



8
9
10
11
12
13
14
15
16
17
# File 'lib/phobos/actions/process_batch.rb', line 8

def initialize(listener:, batch:, listener_metadata:)
  @listener = listener
  @batch = batch
   = 
   = .merge(
    batch_size: batch.messages.count,
    partition: batch.partition,
    offset_lag: batch.offset_lag
  )
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



6
7
8
# File 'lib/phobos/actions/process_batch.rb', line 6

def 
  
end

Instance Method Details

#executeObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/phobos/actions/process_batch.rb', line 19

def execute
  instrument('listener.process_batch', ) do ||
    time_elapsed = measure do
      @batch.messages.each do |message|
        Phobos::Actions::ProcessMessage.new(
          listener: @listener,
          message: message,
          listener_metadata: 
        ).execute
      end
    end

    .merge!(time_elapsed: time_elapsed)
  end
end