Class: GovukMessageQueueConsumer::MessageConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/govuk_message_queue_consumer/message_consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(processors:, handle_batches:) ⇒ MessageConsumer

Returns a new instance of MessageConsumer.



3
4
5
6
# File 'lib/govuk_message_queue_consumer/message_consumer.rb', line 3

def initialize(processors:, handle_batches:)
  @processors = processors
  @handle_batches = handle_batches
end

Instance Method Details

#handles_batches?(processor) ⇒ Boolean

Returns:

  • (Boolean)


18
19
20
21
22
23
24
25
# File 'lib/govuk_message_queue_consumer/message_consumer.rb', line 18

def handles_batches?(processor)
  case processor
  when HeartbeatProcessor, JSONProcessor
    false
  else
    @handle_batches
  end
end

#process(records) ⇒ Object



8
9
10
11
12
13
14
15
16
# File 'lib/govuk_message_queue_consumer/message_consumer.rb', line 8

def process(records)
  @processors.inject(Array(records)) do |remaining_records, processor|
    if handles_batches?(processor)
      processor.process(remaining_records)
    else
      remaining_records.select { |record| processor.process(record) }
    end
  end
end