Class: Lims::Core::Persistence::MessageBus

Inherits:
Object
  • Object
show all
Includes:
Aequitas, Virtus
Defined in:
lib/lims-core/persistence/message_bus.rb

Overview

Basic methods to publish messages on the bus Use the bunny gem as RabbitMQ client

Defined Under Namespace

Classes: ConnectionError, InvalidSettingsError

Instance Method Summary collapse

Constructor Details

#initialize(settings = {}) ⇒ MessageBus

Initialize the message bus and check the required options are passed as parameters.

Parameters:

  • settings (Hash) (defaults to: {})

Raises:



35
36
37
38
39
40
41
42
43
44
# File 'lib/lims-core/persistence/message_bus.rb', line 35

def initialize(settings = {})
  raise InvalidSettingsError, "MessageBug settings are empty" if settings == nil
  @heart_beat = settings["heart_beat"]
  @connection_uri = settings["url"]
  @exchange_name = settings["exchange_name"]
  @exchange_type = settings["exchange_type"] || "topic"
  @durable = settings["durable"]
  @prefetch_number = settings["prefetch_number"]
  @backend_application_id = settings["backend_application_id"]
end

Instance Method Details

#backend_application_id=(backend_application_id) ⇒ Object

sets backend_app_id, but just once, otherwise raise am InvalidSettings error



48
49
50
51
# File 'lib/lims-core/persistence/message_bus.rb', line 48

def backend_application_id=(backend_application_id)
  raise InvalidSettingsError, "Backend Application ID has been set already." if @backend_application_id
  @backend_application_id = backend_application_id
end

#closeObject

Close the connection



82
83
84
# File 'lib/lims-core/persistence/message_bus.rb', line 82

def close
  @connection.close
end

#connectObject

Create a new connection to the broker using the connection settings. Create a channel and setup a new exchange.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/lims-core/persistence/message_bus.rb', line 64

def connect
  begin
    if valid?
      options = @heart_beat ? { :heartbeat => heart_beat } : {}
      @connection = Bunny.new(connection_uri, options)
      @connection.start
      @channel = @connection.create_channel
      set_prefetch_number(prefetch_number)
      set_exchange(exchange_name, :durable => durable)
    else
      raise InvalidSettingsError, "settings are invalid"
    end
  rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError => e
    connection_failure_handler.call
  end
end

#connection_failure_handlerObject

Executed after a connection loss The exception should be catched and rollback the actions.



55
56
57
58
59
# File 'lib/lims-core/persistence/message_bus.rb', line 55

def connection_failure_handler
  Proc.new do
    raise ConnectionError, "can't connect to RabbitMQ server"
  end
end

#publish(message, options = {}) ⇒ Object

Publish a message on the bus with the given options The routing key is passed in the options.

Parameters:

  • JSON (String)

    message

  • publishing (Hash)

    options

Raises:



121
122
123
124
125
126
127
# File 'lib/lims-core/persistence/message_bus.rb', line 121

def publish(message, options = {})
  raise ConnectionError, "exchange is not reachable" unless @exchange.instance_of?(Bunny::Exchange)
  
  options.merge!(:persistent => @message_persistence) unless @message_persistence.nil?
  options.merge!(:app_id => @backend_application_id) if @backend_application_id
  @exchange.publish(message, options)
end

#set_exchange(exchange_name, options = {}) ⇒ Object (private)

Create (or get if it already exists) a new topic exchange with the given options. Especially, the durable option can be set here to mark the exchange as durable (survive a server restart)

Parameters:

  • name (String)
  • exchange (Hash)

    options



92
93
94
# File 'lib/lims-core/persistence/message_bus.rb', line 92

def set_exchange(exchange_name, options = {})
  @exchange = Bunny::Exchange.new(@channel, exchange_type.to_sym, exchange_name , options)
end

#set_message_persistence(persistent) ⇒ Object

Set the message persistence behaviour. If persistent, the message will be persisted to disk and remain in the queue until it is consumed. Survive a server restart. BUNNY ISSUE: bunny0.9pre4 hardcodes the persistent option. It is set all the time, meaning the messages will survive a server restart, if the queue and the exchange are durable.

Parameters:

  • persistence (Bool)

See Also:

  • :delivery_mode => 2


113
114
115
# File 'lib/lims-core/persistence/message_bus.rb', line 113

def set_message_persistence(persistent)
  @message_persistence = persistent 
end

#set_prefetch_number(number) ⇒ Object (private)

Specifies the number of messages to prefetch.

Parameters:

  • number (int)

    of messages to prefetch



99
100
101
# File 'lib/lims-core/persistence/message_bus.rb', line 99

def set_prefetch_number(number)
  @channel.prefetch(number)
end