Class: Mantle::MessageBus

Inherits:
Object
  • Object
show all
Defined in:
lib/mantle/message_bus.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMessageBus

Returns a new instance of MessageBus.



5
6
7
# File 'lib/mantle/message_bus.rb', line 5

def initialize
  @redis = Mantle.configuration.message_bus_redis
end

Instance Attribute Details

#redis=(value) ⇒ Object

Sets the attribute redis

Parameters:

  • value

    the value to set the attribute redis to.



3
4
5
# File 'lib/mantle/message_bus.rb', line 3

def redis=(value)
  @redis = value
end

Instance Method Details

#catch_upObject



22
23
24
# File 'lib/mantle/message_bus.rb', line 22

def catch_up
  Mantle::CatchUp.new.catch_up
end

#listenObject



15
16
17
18
19
20
# File 'lib/mantle/message_bus.rb', line 15

def listen
  Mantle.logger.info("Connecting to message bus redis: #{redis.inspect} ")

  catch_up
  subscribe_to_channels
end

#publish(channel, message) ⇒ Object



9
10
11
12
13
# File 'lib/mantle/message_bus.rb', line 9

def publish(channel, message)
  json = JSON.generate(message)
  redis.publish(channel, json)
  Mantle.logger.debug("Sent message to message bus channel: #{channel}")
end

#subscribe_to_channelsObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/mantle/message_bus.rb', line 26

def subscribe_to_channels
  raise Mantle::Error::MissingRedisConnection unless redis

  if Mantle.channels.any?
    Mantle.logger.info("Subscribing to message bus for #{Mantle.channels} ")
  else
    Mantle.logger.info("No channels configured for subscription. Configure 'message_handlers' if this was unintentional.") and return
  end

  redis.subscribe(Mantle.channels) do |on|
    on.message do |channel, json_message|
      message = JSON.parse(json_message)
      Mantle::MessageRouter.new(channel, message).route
    end
  end
end