Module: MessageQueue
- Extended by:
- MessageQueue
- Included in:
- MessageQueue
- Defined in:
- lib/message_queue.rb,
lib/message_queue/rails.rb,
lib/message_queue/adapter.rb,
lib/message_queue/logging.rb,
lib/message_queue/message.rb,
lib/message_queue/version.rb,
lib/message_queue/consumer.rb,
lib/message_queue/producer.rb,
lib/message_queue/consumable.rb,
lib/message_queue/producible.rb,
lib/message_queue/serializer.rb,
lib/message_queue/adapters/bunny.rb,
lib/message_queue/adapters/memory.rb,
lib/message_queue/serializers/json.rb,
lib/message_queue/consumable_runner.rb,
lib/message_queue/serializers/plain.rb,
lib/message_queue/error_handlers/logger.rb,
lib/message_queue/error_handlers/airbrake.rb,
lib/message_queue/serializers/message_pack.rb
Defined Under Namespace
Modules: Adapters, Consumable, ErrorHandlers, Logging, Producible, Serializers Classes: Connection, ConsumableRunner, Consumer, Message, Producer, Rails
Constant Summary collapse
- ADAPTERS =
[:memory, :bunny]
- SERIALIZERS =
[:plain, :message_pack, :json]
- VERSION =
"0.1.1"
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Class Method Summary collapse
Instance Method Summary collapse
-
#connect(file_or_options = {}) ⇒ Object
Public: Connect to the message queue.
-
#connected? ⇒ Boolean
Public: Check if it’s connected to the message queue.
-
#consumables ⇒ Object
Internal: Get the list of consumables.
-
#disconnect ⇒ Object
Public: Disconnect from the message queue if it’s connected.
-
#error_handlers ⇒ Object
Internal: Get the list of error handlers.
-
#klass_name_for(name) ⇒ Object
Internal: Return the class name for name.
-
#load_adapter(name) ⇒ Object
Internal: Load an adapter by name.
-
#load_serializer(name) ⇒ Object
Internal: Load a serializer by name.
- #logger ⇒ Object
-
#new_connection(options = {}) ⇒ Object
Public: Initialize a connection to a message queue.
-
#new_consumer(options = {}) ⇒ Object
Public: Initialize a consumer using current connection to a message queue.
-
#new_producer(options = {}) ⇒ Object
Public: Initialize a producer using current connection to a message queue.
-
#reconnect ⇒ Object
Public: Reconnect to the message queue if it’s disconnected by using the previous connection settings.
-
#register_consumable(consumable) ⇒ Object
Internal: Register a consumable.
-
#register_error_handler(error_handler) ⇒ Object
Internal: Register a error handler.
- #run_consumables(options = {}) ⇒ Object
-
#with_connection(options = {}, &block) ⇒ Object
Public: Initialize a connection to a message queue, connect and execute code in a block.
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
11 12 13 |
# File 'lib/message_queue.rb', line 11 def connection @connection end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
11 12 13 |
# File 'lib/message_queue.rb', line 11 def settings @settings end |
Class Method Details
.hook_rails! ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 |
# File 'lib/message_queue/rails.rb', line 2 def self.hook_rails! MessageQueue::Logging.logger = ::Rails.logger config_file = ::Rails.root.join("config", "message_queue.yml") config = if config_file.exist? HashWithIndifferentAccess.new YAML.load_file(config_file)[::Rails.env] else { :adapter => :memory, :serializer => :json } end MessageQueue.connect(config) end |
Instance Method Details
#connect(file_or_options = {}) ⇒ Object
Public: Connect to the message queue.
It either reads options from a Hash or the path to the Yaml settings file. After connecting, it stores the connection instance locally.
file_or_options - The Hash options or the String Yaml settings file
Detail Hash see the new_connection method.
Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can’t be found.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/message_queue.rb', line 26 def connect( = {}) if .is_a?(String) require "yaml" = YAML.load_file().inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} end @settings = @connection = new_connection(@settings) @connection.connect end |
#connected? ⇒ Boolean
Public: Check if it’s connected to the message queue
Returns true if it’s connected
63 64 65 |
# File 'lib/message_queue.rb', line 63 def connected? connection.connected? if connection end |
#consumables ⇒ Object
Internal: Get the list of consumables.
Returns the list of consumables.
151 152 153 |
# File 'lib/message_queue.rb', line 151 def consumables @consumables ||= [] end |
#disconnect ⇒ Object
Public: Disconnect from the message queue if it’s connected
It clears out the stored connection.
Returns true if it disconnects successfully
42 43 44 45 46 47 48 49 50 |
# File 'lib/message_queue.rb', line 42 def disconnect if @connection @connection.disconnect @connection = nil return true end false end |
#error_handlers ⇒ Object
Internal: Get the list of error handlers.
Returns the list of error handlers.
144 145 146 |
# File 'lib/message_queue.rb', line 144 def error_handlers @error_handlers ||= [] end |
#klass_name_for(name) ⇒ Object
Internal: Return the class name for name
Returns the class name for specified string
188 189 190 |
# File 'lib/message_queue.rb', line 188 def klass_name_for(name) name.to_s.split("_").map(&:capitalize) * "" end |
#load_adapter(name) ⇒ Object
Internal: Load an adapter by name
Returns the adapter or nil if it can’t find it
158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/message_queue.rb', line 158 def load_adapter(name) ADAPTERS.each do |a| if a.to_s == name.to_s require_relative "message_queue/adapters/#{name}" klass_name = klass_name_for(name) return MessageQueue::Adapters.const_get(klass_name) end end nil end |
#load_serializer(name) ⇒ Object
Internal: Load a serializer by name
Returns the serializer or nil if it can’t find it
173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/message_queue.rb', line 173 def load_serializer(name) SERIALIZERS.each do |s| if s.to_s == name.to_s require_relative "message_queue/serializers/#{name}" klass_name = klass_name_for(name) return MessageQueue::Serializers.const_get(klass_name) end end nil end |
#logger ⇒ Object
119 120 121 |
# File 'lib/message_queue.rb', line 119 def logger Logging.logger end |
#new_connection(options = {}) ⇒ Object
Public: Initialize a connection to a message queue.
options - The Hash options used to initialize a connection
:adapter - The Symbol adapter, currently only :bunny is supported.
Detailed see individual adapter implementation.
:serializer - The Symbol serializer for serialization.
Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can’t be found.
76 77 78 79 80 81 82 83 84 |
# File 'lib/message_queue.rb', line 76 def new_connection( = {}) adapter = load_adapter([:adapter]) raise "Missing adapter #{options[:adapter]}" unless adapter serializer = load_serializer([:serializer]) raise "Missing serializer #{options[:serializer]}" unless serializer adapter.new_connection(serializer, ) end |
#new_consumer(options = {}) ⇒ Object
Public: Initialize a consumer using current connection to a message queue.
Details options see a particular adapter.
Returns a new consumer
115 116 117 |
# File 'lib/message_queue.rb', line 115 def new_consumer( = {}) connection.new_consumer() end |
#new_producer(options = {}) ⇒ Object
Public: Initialize a producer using current connection to a message queue.
Details options see a particular adapter.
Returns a new producer
106 107 108 |
# File 'lib/message_queue.rb', line 106 def new_producer( = {}) connection.new_producer() end |
#reconnect ⇒ Object
Public: Reconnect to the message queue if it’s disconnected by using the previous connection settings
Returns the new connection if it reconnects successfully
55 56 57 58 |
# File 'lib/message_queue.rb', line 55 def reconnect disconnect if connected? connect(settings) end |
#register_consumable(consumable) ⇒ Object
Internal: Register a consumable.
Returns the registered consumables.
130 131 132 |
# File 'lib/message_queue.rb', line 130 def register_consumable(consumable) consumables << consumable end |
#register_error_handler(error_handler) ⇒ Object
Internal: Register a error handler.
Returns the registered error handlers.
137 138 139 |
# File 'lib/message_queue.rb', line 137 def register_error_handler(error_handler) error_handlers << error_handler end |
#run_consumables(options = {}) ⇒ Object
123 124 125 |
# File 'lib/message_queue.rb', line 123 def run_consumables( = {}) MessageQueue::ConsumableRunner.new(consumables).run() end |
#with_connection(options = {}, &block) ⇒ Object
Public: Initialize a connection to a message queue, connect and execute code in a block.
options - The Hash options used to initialize a connection
:adapter - The Symbol adapter, currently only :bunny is supported.
Detailed see individual adapter implementation.
:serializer - The Symbol serializer for serialization.
block - The code to execute. The connection object with be passed in.
Returns nothing Raises a RuntimeError if an adapter can’t be found.
96 97 98 99 |
# File 'lib/message_queue.rb', line 96 def with_connection( = {}, &block) connection = new_connection() connection.with_connection(&block) end |