Module: MessageQueue

Extended by:
MessageQueue
Included in:
MessageQueue
Defined in:
lib/message_queue.rb,
lib/message_queue/rails.rb,
lib/message_queue/adapter.rb,
lib/message_queue/logging.rb,
lib/message_queue/message.rb,
lib/message_queue/version.rb,
lib/message_queue/consumer.rb,
lib/message_queue/producer.rb,
lib/message_queue/consumable.rb,
lib/message_queue/producible.rb,
lib/message_queue/serializer.rb,
lib/message_queue/adapters/bunny.rb,
lib/message_queue/adapters/memory.rb,
lib/message_queue/serializers/json.rb,
lib/message_queue/consumable_runner.rb,
lib/message_queue/serializers/plain.rb,
lib/message_queue/error_handlers/logger.rb,
lib/message_queue/error_handlers/airbrake.rb,
lib/message_queue/serializers/message_pack.rb

Defined Under Namespace

Modules: Adapters, Consumable, ErrorHandlers, Logging, Producible, Serializers Classes: Connection, ConsumableRunner, Consumer, Message, Producer, Rails

Constant Summary collapse

ADAPTERS =
[:memory, :bunny]
SERIALIZERS =
[:plain, :message_pack, :json]
VERSION =
"0.1.1"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



11
12
13
# File 'lib/message_queue.rb', line 11

def connection
  @connection
end

#settingsObject (readonly)

Returns the value of attribute settings.



11
12
13
# File 'lib/message_queue.rb', line 11

def settings
  @settings
end

Class Method Details

.hook_rails!Object



2
3
4
5
6
7
8
9
10
11
12
# File 'lib/message_queue/rails.rb', line 2

def self.hook_rails!
  MessageQueue::Logging.logger = ::Rails.logger

  config_file = ::Rails.root.join("config", "message_queue.yml")
  config = if config_file.exist?
             HashWithIndifferentAccess.new YAML.load_file(config_file)[::Rails.env]
           else
             { :adapter => :memory, :serializer => :json }
           end
  MessageQueue.connect(config)
end

Instance Method Details

#connect(file_or_options = {}) ⇒ Object

Public: Connect to the message queue.

It either reads options from a Hash or the path to the Yaml settings file. After connecting, it stores the connection instance locally.

file_or_options - The Hash options or the String Yaml settings file

Detail Hash options see the new_connection method.

Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can’t be found.



26
27
28
29
30
31
32
33
34
35
# File 'lib/message_queue.rb', line 26

def connect(file_or_options = {})
  if file_or_options.is_a?(String)
    require "yaml"
    file_or_options = YAML.load_file(file_or_options).inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}
  end

  @settings = file_or_options
  @connection = new_connection(@settings)
  @connection.connect
end

#connected?Boolean

Public: Check if it’s connected to the message queue

Returns true if it’s connected

Returns:

  • (Boolean)


63
64
65
# File 'lib/message_queue.rb', line 63

def connected?
  connection.connected? if connection
end

#consumablesObject

Internal: Get the list of consumables.

Returns the list of consumables.



151
152
153
# File 'lib/message_queue.rb', line 151

def consumables
  @consumables ||= []
end

#disconnectObject

Public: Disconnect from the message queue if it’s connected

It clears out the stored connection.

Returns true if it disconnects successfully



42
43
44
45
46
47
48
49
50
# File 'lib/message_queue.rb', line 42

def disconnect
  if @connection
    @connection.disconnect
    @connection = nil
    return true
  end

  false
end

#error_handlersObject

Internal: Get the list of error handlers.

Returns the list of error handlers.



144
145
146
# File 'lib/message_queue.rb', line 144

def error_handlers
  @error_handlers ||= []
end

#klass_name_for(name) ⇒ Object

Internal: Return the class name for name

Returns the class name for specified string



188
189
190
# File 'lib/message_queue.rb', line 188

def klass_name_for(name)
  name.to_s.split("_").map(&:capitalize) * ""
end

#load_adapter(name) ⇒ Object

Internal: Load an adapter by name

Returns the adapter or nil if it can’t find it



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/message_queue.rb', line 158

def load_adapter(name)
  ADAPTERS.each do |a|
    if a.to_s == name.to_s
      require_relative "message_queue/adapters/#{name}"
      klass_name = klass_name_for(name)
      return MessageQueue::Adapters.const_get(klass_name)
    end
  end

  nil
end

#load_serializer(name) ⇒ Object

Internal: Load a serializer by name

Returns the serializer or nil if it can’t find it



173
174
175
176
177
178
179
180
181
182
183
# File 'lib/message_queue.rb', line 173

def load_serializer(name)
  SERIALIZERS.each do |s|
    if s.to_s == name.to_s
      require_relative "message_queue/serializers/#{name}"
      klass_name = klass_name_for(name)
      return MessageQueue::Serializers.const_get(klass_name)
    end
  end

  nil
end

#loggerObject



119
120
121
# File 'lib/message_queue.rb', line 119

def logger
  Logging.logger
end

#new_connection(options = {}) ⇒ Object

Public: Initialize a connection to a message queue.

options - The Hash options used to initialize a connection

:adapter - The Symbol adapter, currently only :bunny is supported.
           Detailed options see individual adapter implementation.
:serializer - The Symbol serializer for serialization.

Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can’t be found.



76
77
78
79
80
81
82
83
84
# File 'lib/message_queue.rb', line 76

def new_connection(options = {})
  adapter = load_adapter(options[:adapter])
  raise "Missing adapter #{options[:adapter]}" unless adapter

  serializer = load_serializer(options[:serializer])
  raise "Missing serializer #{options[:serializer]}" unless serializer

  adapter.new_connection(serializer, options)
end

#new_consumer(options = {}) ⇒ Object

Public: Initialize a consumer using current connection to a message queue.

Details options see a particular adapter.

Returns a new consumer



115
116
117
# File 'lib/message_queue.rb', line 115

def new_consumer(options = {})
  connection.new_consumer(options)
end

#new_producer(options = {}) ⇒ Object

Public: Initialize a producer using current connection to a message queue.

Details options see a particular adapter.

Returns a new producer



106
107
108
# File 'lib/message_queue.rb', line 106

def new_producer(options = {})
  connection.new_producer(options)
end

#reconnectObject

Public: Reconnect to the message queue if it’s disconnected by using the previous connection settings

Returns the new connection if it reconnects successfully



55
56
57
58
# File 'lib/message_queue.rb', line 55

def reconnect
  disconnect if connected?
  connect(settings)
end

#register_consumable(consumable) ⇒ Object

Internal: Register a consumable.

Returns the registered consumables.



130
131
132
# File 'lib/message_queue.rb', line 130

def register_consumable(consumable)
  consumables << consumable
end

#register_error_handler(error_handler) ⇒ Object

Internal: Register a error handler.

Returns the registered error handlers.



137
138
139
# File 'lib/message_queue.rb', line 137

def register_error_handler(error_handler)
  error_handlers << error_handler
end

#run_consumables(options = {}) ⇒ Object



123
124
125
# File 'lib/message_queue.rb', line 123

def run_consumables(options = {})
  MessageQueue::ConsumableRunner.new(consumables).run(options)
end

#with_connection(options = {}, &block) ⇒ Object

Public: Initialize a connection to a message queue, connect and execute code in a block.

options - The Hash options used to initialize a connection

:adapter - The Symbol adapter, currently only :bunny is supported.
           Detailed options see individual adapter implementation.
:serializer - The Symbol serializer for serialization.

block - The code to execute. The connection object with be passed in.

Returns nothing Raises a RuntimeError if an adapter can’t be found.



96
97
98
99
# File 'lib/message_queue.rb', line 96

def with_connection(options = {}, &block)
  connection = new_connection(options)
  connection.with_connection(&block)
end