Class: ReliableMsg::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/reliable-msg/queue.rb

Overview

Reliable Messaging Client API

Use the Queue object to put messages in queues, or get messages from queues.

You can create a Queue object that connects to a single queue by passing the queue name to the initialized. You can also access other queues by specifying the destination queue when putting a message, or selecting from a queue when retrieving the message.

For example:

queue = Queue.new 'my-queue'
# Put a message in the queue with priority 2, expiring in 30 seconds.
msg = 'lorem ipsum'
mid = queue.put msg, :priority=>2, :expires=>30
# Retrieve and process a message from the queue.
queue.get do |msg|
  if msg.id == mid
    print "Retrieved same message"
  end
  print "Message text: #{msg.object}"
end

See Queue.get and Queue.put for more examples.

Constant Summary collapse

ERROR_INVALID_SELECTOR =

:nodoc:

'Selector must be message identifier (String), set of header name/value pairs (Hash), or nil'
ERROR_INVALID_TX_TIMEOUT =

:nodoc:

'Invalid transaction timeout: must be a non-zero positive integer'
ERROR_INVALID_CONNECT_COUNT =

:nodoc:

'Invalid connection count: must be a non-zero positive integer'
ERROR_SELECTOR_VALUE_OR_BLOCK =

:nodoc:

'You can either pass a Selector object, or use a block'
DRB_PORT =

The default DRb port used to connect to the queue manager.

6438
DEFAULT_DRB_URI =

:nodoc:

"druby://localhost:#{DRB_PORT}"
DLQ =

The name of the dead letter queue (DLQ). Messages that expire or fail to process are automatically sent to the dead letter queue.

DEAD_LETTER_QUEUE = 'dlq'
DEFAULT_CONNECT_RETRY =

Number of times to retry a connecting to the queue manager.

5
DEFAULT_TX_TIMEOUT =

Default transaction timeout.

120
DEFAULT_MAX_RETRIES =

Default number of re-delivery attempts.

4
THREAD_CURRENT_TX =

Thread.current entry for queue transaction.

:reliable_msg_tx
@@drb_uri =

DRb URI for queue manager. You can override this to change the URI globally, for all Queue objects that are not instantiated with an alternative URI.

DEFAULT_DRB_URI
@@qm =

Reference to the local queue manager. Defaults to a DRb object, unless the queue manager is running locally.

nil
@@qm_cache =

Cache of queue managers referenced by their URI.

{}

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue = nil, options = nil) ⇒ Queue

The optional argument queue specifies the queue name. The application can still put messages in other queues by specifying the destination queue name in the header, or get from other queues by specifying the queue name in the selector.

TODO: document options

  • :expires

  • :priority

  • :max_retries

  • :selector

  • :drb_uri

  • :tx_timeout

  • :connect_count

:call-seq:

Queue.new([name [,options]])    -> queue


101
102
103
104
105
106
# File 'lib/reliable-msg/queue.rb', line 101

def initialize queue = nil, options = nil
    options.each do |name, value|
        instance_variable_set "@#{name.to_s}".to_sym, value
    end if options
    @queue = queue
end

Class Method Details

.selector(&block) ⇒ Object

Create and return a new selector based on the block expression. For example:

selector = Queue.selector { priority >= 2 and received > Time.new.to_i - 60 }

:call-seq:

Queue.selector { ... }  -> selector

Raises:

  • (ArgumentError)


404
405
406
407
# File 'lib/reliable-msg/queue.rb', line 404

def self.selector &block
    raise ArgumentError, ERROR_NO_SELECTOR_BLOCK unless block
    Selector.new &block
end

Instance Method Details

#connect_countObject

Returns the number of connection attempts, before operations fail.

:call-seq:

queue.connect_count -> numeric


338
339
340
# File 'lib/reliable-msg/queue.rb', line 338

def connect_count
    @connect_count || DEFAULT_CONNECT_RETRY
end

#connect_count=(count) ⇒ Object

Sets the number of connection attempts, before operations fail. The minimum is one. Use nil to restore the default connection count.

:call-seq:

queue.connect_count = count
queue.connect_count = nil


349
350
351
352
353
354
355
356
# File 'lib/reliable-msg/queue.rb', line 349

def connect_count= count
    if count
        raise ArgumentError, ERROR_INVALID_CONNECT_COUNT unless count.instance_of?(Integer) and count > 0
        @connect_count = count
    else
        @connect_count = nil
    end
end

#get(selector = nil, &block) ⇒ Object

Get a message from the queue.

Call with no arguments to retrieve the next message in the queue. Call with a message identifier to retrieve that message. Call with selectors to retrieve the first message that matches.

Selectors specify which headers to match. For example, to retrieve all messages in the queue ‘my-queue’ with priority 2:

msg = queue.get :queue=>'my-queue', :priority=>2

To put and get the same message:

mid = queue.put obj
msg = queue.get mid # or queue.get :id=>mid
assert(msg.obj == obj)

More complex selector expressions can be generated using Queue.selector. For example, to retrieve the next message with priority 2 or higher, received in the last 60 seconds:

selector = Queue.selector { priority >= 2 and received > Time.new.to_i - 60 }
msg = queue.get selector

You can also specify selectors for a Queue to be used by default for all Queue.get calls on that Queue object. For example:

queue.selector= { priority >= 2 and received > Time.new.to_i - 60 }
msg = queue.get  # default selector applies

The following headers have special meaning:

  • :id – The message identifier.

  • :queue – Select a message originally delivered to the named queue. Only used when retrieving messages from the dead-letter queue.

  • :retry – Specifies the retry count for the message. Zero when the message is first delivered, and incremented after each re-delivery attempt.

  • :created – Indicates timestamp (in seconds) when the message was created.

  • :received – Indicates timestamp (in seconds) when the message was received.

  • :expires_at – Indicates timestamp (in seconds) when the message will expire, nil if the message does not expire.

Call this method without a block to return the message. The returned object is of type Message, or nil if no message is found.

Call this method in a block to retrieve and process the message. The block is called with the Message object, returning the result of the block. Returns nil if no message is found.

All operations performed on the queue inside the block are part of the same transaction. The transaction commits when the block completes. However, if the block raises an exception, the transaction aborts: the message along with any message retrieved through that Queue object are returned to the queue; messages put through that Queue object are discarded. You cannot put and get the same message inside a transaction.

For example:

queue.put obj
while queue.get do |msg|  # called for each message in the queue,
                          # until the queue is empty
  ... do something with msg ...
  queue.put obj           # puts another message in the queue
  true
end

This loop will only complete if it raises an exception, since it gets one message from the queue and puts another message in its place. After an exception, there will be at least one message in the queue.

Each attempt to process a message increases its retry count. When the retry count (:retry) reaches the maximum allowed (:max_retry), the message is moved to the dead-letter queue.

This method does not block and returns immediately if there is no message in the queue. To continue processing all messages in the queue:

while true  # repeat forever
  while true
    break unless queue.get do |msg|
      ... do something with msg ...
      true
    end
  end
  sleep 5     # no messages, wait
end

:call-seq:

queue.get([selector]) -> msg or nil
queue.get([selector]) {|msg| ... } -> obj

See: Message



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/reliable-msg/queue.rb', line 255

def get selector = nil, &block
    tx = old_tx = Thread.current[THREAD_CURRENT_TX]
    # If block, begin a new transaction.

    if block
        tx = {:qm=>qm}
        tx[:tid] = tx[:qm].begin tx_timeout
        Thread.current[THREAD_CURRENT_TX] = tx
    end
    result = begin
        # Validate the selector: nil, string or hash.

        selector = case selector
            when String
                {:id=>selector}
            when Hash, Array, Selector
                selector
            when nil
                @selector
            else
                raise ArgumentError, ERROR_INVALID_SELECTOR
        end
        # If inside a transaction, always retrieve from the same queue manager,

        # otherwise, allow repeated() to try and access multiple queue managers.

        message = if tx
            tx[:qm].enqueue :queue=>@queue, :selector=>selector, :tid=>tx[:tid]
        else
            repeated { |qm| qm.enqueue :queue=>@queue, :selector=>selector }
        end
        # Result is either message, or result from processing block. Note that

        # calling block may raise an exception. We deserialize the message here

        # for two reasons:

        # 1. It creates a distinct copy, so changing the message object and returning

        #    it to the queue (abort) does not affect other consumers.

        # 2. The message may rely on classes known to the client but not available

        #    to the queue manager.

        result = if message
            message = Message.new(message[:id], message[:headers], Marshal::load(message[:message]))
            block ? block.call(message) : message
        end
    rescue Exception=>error
        # Abort the transaction if we started it. Propagate error.

        qm.abort(tx[:tid]) if block
        raise error
    ensure
        # Resume the old transaction.

        Thread.current[THREAD_CURRENT_TX] = old_tx if block
    end
    # Commit the transaction and return the result. We do this outside the main

    # block, since we don't abort in case of error (commit is one-phase) and we

    # don't retain the transaction association, it completes by definition.

    qm.commit(tx[:tid]) if block
    result
end

#put(message, headers = nil) ⇒ Object

Put a message in the queue.

The message argument is required, but may be nil

Headers are optional. Headers are used to provide the application with additional information about the message, and can be used to retrieve messages (see Queue.put for discussion of selectors). Some headers are used to handle message processing internally (e.g. :priority, :expires).

Each header uses a symbol for its name. The value may be string, numeric, true/false or nil. No other objects are allowed. To improve performance, keep headers as small as possible.

The following headers have special meaning:

  • :delivery – The message delivery mode.

  • :queue – Puts the message in the named queue. Otherwise, uses the queue specified when creating the Queue object.

  • :priority – The message priority. Messages with higher priority are retrieved first.

  • :expires – Message expiration in seconds. Messages do not expire unless specified. Zero or nil means no expiration.

  • :expires_at – Specifies when the message expires (timestamp). Alternative to :expires.

  • :max_retries – Maximum number of attempts to re-deliver message, afterwhich message moves to the DLQ. Minimum is 0 (deliver only once), default is 4 (deliver up to 5 times).

Headers can be set on a per-queue basis when the Queue is created. This only affects messages put through that Queue object.

Messages can be delivered using one of three delivery modes:

  • :best_effort – Attempt to deliver the message once. If the message expires or cannot be delivered, discard the message. The is the default delivery mode.

  • :repeated – Attempt to deliver until message expires, or up to maximum re-delivery count (see :max_retries). Afterwards, move message to dead-letter queue.

  • :once – Attempt to deliver message exactly once. If message expires, or first delivery attempt fails, move message to dead-letter queue.

For example:

queue.put request
queue.put notice, :expires=>10
queue.put object, :queue=>'other-queue'

:call-seq:

queue.put(message[, headers]) -> id


155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/reliable-msg/queue.rb', line 155

def put message, headers = nil
    tx = Thread.current[THREAD_CURRENT_TX]
    # Use headers supplied by callers, or defaults for this queue.

    headers ||= {}
    headers.fetch(:priority, @priority || 0)
    headers.fetch(:expires, @expires)
    headers.fetch(:max_retries, @max_retries || DEFAULT_MAX_RETRIES)
    # Serialize the message before sending to queue manager. We need the

    # message to be serialized for storage, this just saves duplicate

    # serialization when using DRb.

    message = Marshal::dump message
    # If inside a transaction, always send to the same queue manager, otherwise,

    # allow repeated() to try and access multiple queue managers.

    if tx
        return tx[:qm].queue(:message=>message, :headers=>headers, :queue=>(headers[:queue] || @queue), :tid=>tx[:tid])
    else
        return repeated { |qm| qm.queue :message=>message, :headers=>headers, :queue=>(headers[:queue] || @queue) }
    end
end

#selector(&block) ⇒ Object

If called with no block, returns the selector associated with this Queue (see Queue.selector=). If called with a block, creates and returns a new selector (similar to Queue::selector).

:call-seq:

queue.selector -> selector
queue.selector { ... } -> selector


366
367
368
# File 'lib/reliable-msg/queue.rb', line 366

def selector &block
    block ? Selector.new(&block) : @selector
end

#selector=(value = nil, &block) ⇒ Object

Sets a default selector for this Queue. Affects all calls to Queue.get on this Queue object that do not specify a selector.

You can pass a Selector object, a block expression, or nil if you no longer want to use the default selector. For example:

queue.selector= { priority >= 2 and received > Time.new.to_i - 60 }
10.times do
  p queue.get
end
queue.selector= nil

:call-seq:

queue.selector = selector
queue.selector = { ... }
queue.selector = nil

Raises:

  • (ArgumentError)


386
387
388
389
390
391
392
393
394
395
396
# File 'lib/reliable-msg/queue.rb', line 386

def selector= value = nil, &block
    raise ArgumentError, ERROR_SELECTOR_VALUE_OR_BLOCK if (value && block)
    if value
        raise ArgumentError, ERROR_SELECTOR_VALUE_OR_BLOCK unless value.instance_of?(Selector)
        @selector = value
    elsif block
        @selector = Selector.new &block
    else
        @selector = nil
    end
end

#tx_timeoutObject

Returns the transaction timeout (in seconds).

:call-seq:

queue.tx_timeout -> numeric


313
314
315
# File 'lib/reliable-msg/queue.rb', line 313

def tx_timeout
    @tx_timeout || DEFAULT_TX_TIMEOUT
end

#tx_timeout=(timeout) ⇒ Object

Sets the transaction timeout (in seconds). Affects future transactions started by Queue.get. Use nil to restore the default timeout.

:call-seq:

queue.tx_timeout = timeout
queue.tx_timeout = nil


324
325
326
327
328
329
330
331
# File 'lib/reliable-msg/queue.rb', line 324

def tx_timeout= timeout
    if timeout
        raise ArgumentError, ERROR_INVALID_TX_TIMEOUT unless timeout.instance_of?(Integer) and timeout > 0
        @tx_timeout = timeout
    else
        @tx_timeout = nil
    end
end