Class: Lims::Core::Persistence::MessageBus
- Includes:
- Aequitas, Virtus
- Defined in:
- lib/lims-core/persistence/message_bus.rb
Overview
Basic methods to publish messages on the bus Use the bunny gem as RabbitMQ client
Defined Under Namespace
Classes: ConnectionError, InvalidSettingsError
Instance Method Summary collapse
-
#backend_application_id=(backend_application_id) ⇒ Object
sets backend_app_id, but just once, otherwise raise am InvalidSettings error.
-
#close ⇒ Object
Close the connection.
-
#connect ⇒ Object
Create a new connection to the broker using the connection settings.
-
#connection_failure_handler ⇒ Object
Executed after a connection loss The exception should be catched and rollback the actions.
-
#initialize(settings = {}) ⇒ MessageBus
constructor
Initialize the message bus and check the required options are passed as parameters.
-
#publish(message, options = {}) ⇒ Object
Publish a message on the bus with the given options The routing key is passed in the options.
-
#set_exchange(exchange_name, options = {}) ⇒ Object
private
Create (or get if it already exists) a new topic exchange with the given options.
-
#set_message_persistence(persistent) ⇒ Object
Set the message persistence behaviour.
-
#set_prefetch_number(number) ⇒ Object
private
Specifies the number of messages to prefetch.
Constructor Details
#initialize(settings = {}) ⇒ MessageBus
Initialize the message bus and check the required options are passed as parameters.
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/lims-core/persistence/message_bus.rb', line 35 def initialize(settings = {}) raise InvalidSettingsError, "MessageBug settings are empty" if settings == nil @heart_beat = settings["heart_beat"] @connection_uri = settings["url"] @exchange_name = settings["exchange_name"] @exchange_type = settings["exchange_type"] || "topic" @durable = settings["durable"] @prefetch_number = settings["prefetch_number"] @backend_application_id = settings["backend_application_id"] end |
Instance Method Details
#backend_application_id=(backend_application_id) ⇒ Object
sets backend_app_id, but just once, otherwise raise am InvalidSettings error
48 49 50 51 |
# File 'lib/lims-core/persistence/message_bus.rb', line 48 def backend_application_id=(backend_application_id) raise InvalidSettingsError, "Backend Application ID has been set already." if @backend_application_id @backend_application_id = backend_application_id end |
#close ⇒ Object
Close the connection
82 83 84 |
# File 'lib/lims-core/persistence/message_bus.rb', line 82 def close @connection.close end |
#connect ⇒ Object
Create a new connection to the broker using the connection settings. Create a channel and setup a new exchange.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/lims-core/persistence/message_bus.rb', line 64 def connect begin if valid? = @heart_beat ? { :heartbeat => heart_beat } : {} @connection = Bunny.new(connection_uri, ) @connection.start @channel = @connection.create_channel set_prefetch_number(prefetch_number) set_exchange(exchange_name, :durable => durable) else raise InvalidSettingsError, "settings are invalid" end rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError => e connection_failure_handler.call end end |
#connection_failure_handler ⇒ Object
Executed after a connection loss The exception should be catched and rollback the actions.
55 56 57 58 59 |
# File 'lib/lims-core/persistence/message_bus.rb', line 55 def connection_failure_handler Proc.new do raise ConnectionError, "can't connect to RabbitMQ server" end end |
#publish(message, options = {}) ⇒ Object
Publish a message on the bus with the given options The routing key is passed in the options.
121 122 123 124 125 126 127 |
# File 'lib/lims-core/persistence/message_bus.rb', line 121 def publish(, = {}) raise ConnectionError, "exchange is not reachable" unless @exchange.instance_of?(Bunny::Exchange) .merge!(:persistent => @message_persistence) unless @message_persistence.nil? .merge!(:app_id => @backend_application_id) if @backend_application_id @exchange.publish(, ) end |
#set_exchange(exchange_name, options = {}) ⇒ Object (private)
Create (or get if it already exists) a new topic exchange with the given options. Especially, the durable option can be set here to mark the exchange as durable (survive a server restart)
92 93 94 |
# File 'lib/lims-core/persistence/message_bus.rb', line 92 def set_exchange(exchange_name, = {}) @exchange = Bunny::Exchange.new(@channel, exchange_type.to_sym, exchange_name , ) end |
#set_message_persistence(persistent) ⇒ Object
Set the message persistence behaviour. If persistent, the message will be persisted to disk and remain in the queue until it is consumed. Survive a server restart. BUNNY ISSUE: bunny0.9pre4 hardcodes the persistent option. It is set all the time, meaning the messages will survive a server restart, if the queue and the exchange are durable.
113 114 115 |
# File 'lib/lims-core/persistence/message_bus.rb', line 113 def (persistent) @message_persistence = persistent end |
#set_prefetch_number(number) ⇒ Object (private)
Specifies the number of messages to prefetch.
99 100 101 |
# File 'lib/lims-core/persistence/message_bus.rb', line 99 def set_prefetch_number(number) @channel.prefetch(number) end |