Module: Deimos::Consume::BatchConsumption

Extended by:
ActiveSupport::Concern
Includes:
Phobos::BatchHandler
Included in:
Deimos::Consumer
Defined in:
lib/deimos/consume/batch_consumption.rb

Overview

Helper methods used by batch consumers, i.e. those with “inline_batch” delivery. Payloads are decoded then consumers are invoked with arrays of messages to be handled at once

Instance Method Summary collapse

Instance Method Details

#around_consume_batch(batch, metadata) ⇒ Object

:nodoc:



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/deimos/consume/batch_consumption.rb', line 13

def around_consume_batch(batch, )
  payloads = []
  _with_span do
    benchmark = Benchmark.measure do
      if self.class.config[:key_configured]
        [:keys] = batch.map do |message|
          decode_key(message.key)
        end
      end
      [:first_offset] = batch.first&.offset

      payloads = batch.map do |message|
        decode_message(message.payload)
      end
      _received_batch(payloads, )
      yield(payloads, )
    end
    _handle_batch_success(benchmark.real, payloads, )
  end
rescue StandardError => e
  _handle_batch_error(e, payloads, )
end

#consume_batch(_payloads, _metadata) ⇒ Object

Consume a batch of incoming messages.

Parameters:

  • _payloads (Array<Phobos::BatchMessage>)
  • _metadata (Hash)

Raises:

  • (NotImplementedError)


39
40
41
# File 'lib/deimos/consume/batch_consumption.rb', line 39

def consume_batch(_payloads, )
  raise NotImplementedError
end