Class: Agni::Messenger
- Inherits:
-
Object
- Object
- Agni::Messenger
- Includes:
- LogMixin
- Defined in:
- lib/agni/messenger.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
-
#connected? ⇒ Boolean
Returns
true
if the messenger is connected to AMQP,false
otherwise. -
#get_queue(queue_name, options = {}) ⇒ Agni::Queue
Gets a queue with the given options.
-
#initialize(amqp_url) ⇒ Messenger
constructor
Creates a Messenger, connecting to the supplied URL.
-
#publish(msg, queue_name, options = {}) ⇒ Object
Convenience method that publishes a message to the given queue name.
-
#queue?(queue_name) ⇒ TrueClass, FalseClass
Whether or not a queue with the given name is known to this Messenger instance.
-
#queue_consumers(queue_name) ⇒ Fixnum
The number of consumers for the queue with provided queue name.
-
#queue_messages(queue_name) ⇒ Fixnum
Get and return the number of messages in a given queue.
-
#subscribe(queue_name, options = {}, &handler) ⇒ Agni::Queue
Convenience method that takes a queue name (creating the queue if necessary) and accepts a block that it will yield to for each incoming message.
-
#unsubscribe(queue_name) ⇒ Object
Unsubscribe this messenger from the queue associated with the given name.
-
#wait ⇒ Object
This method allows a client of the messenger to block on the execution of the EventMachine, so it can run in a context that is dedicated to running for the purpose of receiving messages.
Constructor Details
#initialize(amqp_url) ⇒ Messenger
Creates a Messenger, connecting to the supplied URL.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/agni/messenger.rb', line 11 def initialize(amqp_url) if amqp_url.nil? || amqp_url.empty? raise ArgumentError, "AMQP url is required to create a Messenger" end self.configure_logs # Start EventMachine if needed unless EventMachine.reactor_running? @em_thread = Thread.new { EventMachine.run } end # Block until EventMachine has started info("Waiting for EventMachine to start") spin_until { EventMachine.reactor_running? } info("EventMachine start detected") EventMachine.threadpool_size = ENV.fetch('EM_THREADPOOL_SIZE', DEFAULT_THREADPOOL_SIZE).to_i unless @connection = AMQP.connect(amqp_url, DEFAULT_CONNECTION_OPTS) raise AgniError, "Unable to connect to AMQP instance at #{amqp_url}" end # A hash which maps queue names to Agni::Queue objects. # Tracks what queues we have access to. @queues = {} end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
5 6 7 |
# File 'lib/agni/messenger.rb', line 5 def connection @connection end |
Instance Method Details
#connected? ⇒ Boolean
Returns true
if the messenger is connected to AMQP, false
otherwise.
177 178 179 |
# File 'lib/agni/messenger.rb', line 177 def connected? return @connection.connected? end |
#get_queue(queue_name, options = {}) ⇒ Agni::Queue
Gets a queue with the given options. If no options are provided, a default set of options will be used that makes the queue save its messages to disk so that they won’t be lost if the AMQP service is restarted.
46 47 48 49 50 51 |
# File 'lib/agni/messenger.rb', line 46 def get_queue(queue_name, ={}) @queues.fetch(queue_name) do |queue_name| queue = Queue.new(queue_name, self, ) @queues[queue_name] = queue end end |
#publish(msg, queue_name, options = {}) ⇒ Object
Convenience method that publishes a message to the given queue name.
One of the main uses of the options hash is to specify a message priority between 0 and 9:
messenger.publish("Hello World", "test_queue", priority: 7)
But the default priority is 4, so this would be published with a priority of 4:
messenger.publish("Hello World", "test_queue")
101 102 103 104 |
# File 'lib/agni/messenger.rb', line 101 def publish(msg, queue_name, ={}) priority = .delete(:priority) || DEFAULT_PRIORITY get_queue(queue_name).publish(msg, priority, ) end |
#queue?(queue_name) ⇒ TrueClass, FalseClass
Returns whether or not a queue with the given name is known to this Messenger instance.
55 56 57 |
# File 'lib/agni/messenger.rb', line 55 def queue?(queue_name) @queues.key?(queue_name) end |
#queue_consumers(queue_name) ⇒ Fixnum
Returns the number of consumers for the queue with provided queue name. If the queue is not yet created, it will be created when this method is called.
77 78 79 |
# File 'lib/agni/messenger.rb', line 77 def queue_consumers(queue_name) get_queue(queue_name).consumer_count if queue?(queue_name) end |
#queue_messages(queue_name) ⇒ Fixnum
Get and return the number of messages in a given queue.
67 68 69 |
# File 'lib/agni/messenger.rb', line 67 def (queue_name) get_queue(queue_name). if queue?(queue_name) end |
#subscribe(queue_name, options = {}, &handler) ⇒ Agni::Queue
The block passed to this method must not block, since it will be run in a single-threaded context.
Convenience method that takes a queue name (creating the queue if necessary) and accepts a block that it will yield to for each incoming message. The block passed in to this method should accept two arguments: the metadata of the message being received, as well as the payload of the message.
This method is non-blocking, and if at any time the Messenger should no longer yield to the provided block when new messages arrive, the unsubscribe
method can be called on the Messenger and given the queue name to unsubscribe from.
If no block is passed to this method, it will simply subscribe to the queue and drain it of messages as they come in.
To prevent lossage, this method will set up the subscription with the AMQP server to require acking of the messages by the client. As far as the end user is concerned, this means that if the messenger dies an untimely death, any unprocessed messages that remained in the buffer will be requeued on the server. Messenger will take care of the acking for the user, unless an option is passed to indicate that the user will handle acking in the provided block.
142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/agni/messenger.rb', line 142 def subscribe(queue_name, ={}, &handler) if queue_name.nil? || queue_name.empty? raise ArgumentError, 'Queue name must be present when subscribing' end queue = get_queue(queue_name) if queue.subscribed? raise AgniError, "Queue #{queue_name} is already subscribed!" end queue.subscribe(handler, ) # spin_until { queue.subscribed? } end |
#unsubscribe(queue_name) ⇒ Object
Unsubscribe this messenger from the queue associated with the given name.
159 160 161 162 163 164 165 166 |
# File 'lib/agni/messenger.rb', line 159 def unsubscribe(queue_name) if queue_name.nil? || queue_name.empty? raise ArgumentError, 'Queue name must be present when unsubscribing' end if queue = get_queue(queue_name) queue.unsubscribe end end |
#wait ⇒ Object
This method allows a client of the messenger to block on the execution of the EventMachine, so it can run in a context that is dedicated to running for the purpose of receiving messages.
171 172 173 |
# File 'lib/agni/messenger.rb', line 171 def wait @em_thread.join end |