Class: Karafka::Connection::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/consumer.rb

Overview

Class that consumes messages for which we listen

Instance Method Summary collapse

Instance Method Details

#consume(message) ⇒ Object

Note:

This should be looped to obtain a constant listening

Note:

We catch all the errors here, to make sure that none failures for a given consumption will affect other consumed messages If we would’t catch it, it would propagate up until killing the Celluloid actor

Consumes a message (does something with it) It will execute a scheduling task from a proper controller based on a message topic

Parameters:

  • message (Kafka::FetchedMessage)

    message that was fetched by kafka



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/karafka/connection/consumer.rb', line 12

def consume(message)
  controller = Karafka::Routing::Router.new(message.topic).build
  # We wrap it around with our internal message format, so we don't pass around
  # a raw Kafka message
  controller.params = Message.new(message.topic, message.value)

  Karafka.monitor.notice(self.class, controller.to_h)

  controller.schedule
  # This is on purpose - see the notes for this method
  # rubocop:disable RescueException
rescue Exception => e
  # rubocop:enable RescueException
  Karafka.monitor.notice_error(self.class, e)
end