Class: Agni::Messenger

Inherits:
Object
  • Object
show all
Includes:
LogMixin
Defined in:
lib/agni/messenger.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(amqp_url) ⇒ Messenger

Creates a Messenger, connecting to the supplied URL.

Parameters:

  • amqp_url (String)

    the url to the AMQP instance this Messenger should connect to. Should begin with ‘amqp://’.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/agni/messenger.rb', line 11

def initialize(amqp_url)
  if amqp_url.nil? || amqp_url.empty?
    raise ArgumentError, "AMQP url is required to create a Messenger"
  end
  self.configure_logs
  # Start EventMachine if needed
  unless EventMachine.reactor_running?
    @em_thread = Thread.new { EventMachine.run }
  end

  # Block until EventMachine has started
  info("Waiting for EventMachine to start")
  spin_until { EventMachine.reactor_running? }
  info("EventMachine start detected")

  EventMachine.threadpool_size = ENV.fetch('EM_THREADPOOL_SIZE', DEFAULT_THREADPOOL_SIZE).to_i

  unless @connection = AMQP.connect(amqp_url, DEFAULT_CONNECTION_OPTS)
    raise AgniError, "Unable to connect to AMQP instance at #{amqp_url}"
  end

  # A hash which maps queue names to Agni::Queue objects.
  # Tracks what queues we have access to.
  @queues = {}
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



5
6
7
# File 'lib/agni/messenger.rb', line 5

def connection
  @connection
end

Instance Method Details

#connected?Boolean

Returns true if the messenger is connected to AMQP, false otherwise.

Returns:

  • (Boolean)


177
178
179
# File 'lib/agni/messenger.rb', line 177

def connected?
  return @connection.connected?
end

#get_queue(queue_name, options = {}) ⇒ Agni::Queue

Gets a queue with the given options. If no options are provided, a default set of options will be used that makes the queue save its messages to disk so that they won’t be lost if the AMQP service is restarted.

Returns:

Raises:

  • ArgumentError if the queue name is not provided

  • AgniError if the queue has already been created with an incompatible set of options.



46
47
48
49
50
51
# File 'lib/agni/messenger.rb', line 46

def get_queue(queue_name, options={})
  @queues.fetch(queue_name) do |queue_name|
    queue = Queue.new(queue_name, self, options)
    @queues[queue_name] = queue
  end
end

#publish(msg, queue_name, options = {}) ⇒ Object

Convenience method that publishes a message to the given queue name.

One of the main uses of the options hash is to specify a message priority between 0 and 9:

messenger.publish("Hello World", "test_queue", priority: 7)

But the default priority is 4, so this would be published with a priority of 4:

messenger.publish("Hello World", "test_queue")

Parameters:

  • msg (String)

    the message to enqueue

  • queue_name (String)

    the name of the queue to publish to

  • options (Hash) (defaults to: {})

    optional – options that will be passed to the underlying AMQP queue during publishing. All keys should be symbols.

  • :priority (Hash)

    a customizable set of options



101
102
103
104
# File 'lib/agni/messenger.rb', line 101

def publish(msg, queue_name, options={})
  priority = options.delete(:priority) || DEFAULT_PRIORITY
  get_queue(queue_name).publish(msg, priority, options)
end

#queue?(queue_name) ⇒ TrueClass, FalseClass

Returns whether or not a queue with the given name is known to this Messenger instance.

Returns:

  • (TrueClass, FalseClass)

    whether or not a queue with the given name is known to this Messenger instance.



55
56
57
# File 'lib/agni/messenger.rb', line 55

def queue?(queue_name)
  @queues.key?(queue_name)
end

#queue_consumers(queue_name) ⇒ Fixnum

Returns the number of consumers for the queue with provided queue name. If the queue is not yet created, it will be created when this method is called.

Parameters:

  • queue_name (String)

    the name of the queue for which the the consumer count should be fetched.

Returns:

  • (Fixnum)

    the number of consumers for the queue with provided queue name. If the queue is not yet created, it will be created when this method is called.

Raises:

  • ArgumentError if the queue_name is not supplied



77
78
79
# File 'lib/agni/messenger.rb', line 77

def queue_consumers(queue_name)
  get_queue(queue_name).consumer_count if queue?(queue_name)
end

#queue_messages(queue_name) ⇒ Fixnum

Get and return the number of messages in a given queue.

Parameters:

  • queue_name (String)

    the name of the queue for which the the message count should be fetched.

Returns:

  • (Fixnum)

    the number of messages in the queue with provided queue name. If the queue is not yet created, the method will return nil.

Raises:

  • ArgumentError if the queue_name is not supplied



67
68
69
# File 'lib/agni/messenger.rb', line 67

def queue_messages(queue_name)
  get_queue(queue_name).message_count if queue?(queue_name)
end

#subscribe(queue_name, options = {}, &handler) ⇒ Agni::Queue

Note:

The block passed to this method must not block, since it will be run in a single-threaded context.

Convenience method that takes a queue name (creating the queue if necessary) and accepts a block that it will yield to for each incoming message. The block passed in to this method should accept two arguments: the metadata of the message being received, as well as the payload of the message.

This method is non-blocking, and if at any time the Messenger should no longer yield to the provided block when new messages arrive, the unsubscribe method can be called on the Messenger and given the queue name to unsubscribe from.

If no block is passed to this method, it will simply subscribe to the queue and drain it of messages as they come in.

To prevent lossage, this method will set up the subscription with the AMQP server to require acking of the messages by the client. As far as the end user is concerned, this means that if the messenger dies an untimely death, any unprocessed messages that remained in the buffer will be requeued on the server. Messenger will take care of the acking for the user, unless an option is passed to indicate that the user will handle acking in the provided block.

Parameters:

  • queue_name (String)

    The name of the queue that should be examined for messages

  • options (Hash) (defaults to: {})

    (optional) A hash of options

Options Hash (options):

  • :ack (TrueClass, FalseClass)

    Whether messenger should ack incoming messages for this subscription. If set to false, the block passed to this method must ack messages when they have been processed. Defaults to true.

Returns:



142
143
144
145
146
147
148
149
150
151
152
# File 'lib/agni/messenger.rb', line 142

def subscribe(queue_name, options={}, &handler)
  if queue_name.nil? || queue_name.empty?
    raise ArgumentError, 'Queue name must be present when subscribing'
  end
  queue = get_queue(queue_name)
  if queue.subscribed?
    raise AgniError, "Queue #{queue_name} is already subscribed!"
  end
  queue.subscribe(handler, options)
  # spin_until { queue.subscribed?  }
end

#unsubscribe(queue_name) ⇒ Object

Unsubscribe this messenger from the queue associated with the given name.

Raises:

  • ArgumentError if the queue name is empty

  • AgniError if the queue does not exist



159
160
161
162
163
164
165
166
# File 'lib/agni/messenger.rb', line 159

def unsubscribe(queue_name)
  if queue_name.nil? || queue_name.empty?
    raise ArgumentError, 'Queue name must be present when unsubscribing'
  end
  if queue = get_queue(queue_name)
    queue.unsubscribe
  end
end

#waitObject

This method allows a client of the messenger to block on the execution of the EventMachine, so it can run in a context that is dedicated to running for the purpose of receiving messages.



171
172
173
# File 'lib/agni/messenger.rb', line 171

def wait
  @em_thread.join
end