Class: ReliableMsg::Queue
- Inherits:
-
Object
- Object
- ReliableMsg::Queue
- Defined in:
- lib/reliable-msg/queue.rb
Overview
Reliable Messaging Client API
Use the Queue object to put messages in queues, or get messages from queues.
You can create a Queue object that connects to a single queue by passing the queue name to the initialized. You can also access other queues by specifying the destination queue when putting a message, or selecting from a queue when retrieving the message.
For example:
queue = Queue.new 'my-queue'
# Put a message in the queue with priority 2, expiring in 30 seconds.
msg = 'lorem ipsum'
mid = queue.put msg, :priority=>2, :expires=>30
# Retrieve and process a message from the queue.
queue.get do |msg|
if msg.id == mid
print "Retrieved same message"
end
print "Message text: #{msg.object}"
end
See Queue.get and Queue.put for more examples.
Constant Summary collapse
- ERROR_INVALID_SELECTOR =
:nodoc:
'Selector must be message identifier (String), set of header name/value pairs (Hash), or nil'- ERROR_INVALID_TX_TIMEOUT =
:nodoc:
'Invalid transaction timeout: must be a non-zero positive integer'- ERROR_INVALID_CONNECT_COUNT =
:nodoc:
'Invalid connection count: must be a non-zero positive integer'- ERROR_SELECTOR_VALUE_OR_BLOCK =
:nodoc:
'You can either pass a Selector object, or use a block'- DRB_PORT =
The default DRb port used to connect to the queue manager.
6438- DEFAULT_DRB_URI =
:nodoc:
"druby://localhost:#{DRB_PORT}"- DLQ =
The name of the dead letter queue (
DLQ). Messages that expire or fail to process are automatically sent to the dead letter queue. DEAD_LETTER_QUEUE = 'dlq'
- DEFAULT_CONNECT_RETRY =
Number of times to retry a connecting to the queue manager.
5- DEFAULT_TX_TIMEOUT =
Default transaction timeout.
120- DEFAULT_MAX_RETRIES =
Default number of re-delivery attempts.
4- THREAD_CURRENT_TX =
Thread.current entry for queue transaction.
:reliable_msg_tx- @@drb_uri =
DRb URI for queue manager. You can override this to change the URI globally, for all Queue objects that are not instantiated with an alternative URI.
DEFAULT_DRB_URI- @@qm =
Reference to the local queue manager. Defaults to a DRb object, unless the queue manager is running locally.
nil- @@qm_cache =
Cache of queue managers referenced by their URI.
{}
Class Method Summary collapse
-
.selector(&block) ⇒ Object
Create and return a new selector based on the block expression.
Instance Method Summary collapse
-
#connect_count ⇒ Object
Returns the number of connection attempts, before operations fail.
-
#connect_count=(count) ⇒ Object
Sets the number of connection attempts, before operations fail.
-
#get(selector = nil, &block) ⇒ Object
Get a message from the queue.
-
#initialize(queue = nil, options = nil) ⇒ Queue
constructor
The optional argument
queuespecifies the queue name. -
#put(message, headers = nil) ⇒ Object
Put a message in the queue.
-
#selector(&block) ⇒ Object
If called with no block, returns the selector associated with this Queue (see Queue.selector=).
-
#selector=(value = nil, &block) ⇒ Object
Sets a default selector for this Queue.
-
#tx_timeout ⇒ Object
Returns the transaction timeout (in seconds).
-
#tx_timeout=(timeout) ⇒ Object
Sets the transaction timeout (in seconds).
Constructor Details
#initialize(queue = nil, options = nil) ⇒ Queue
The optional argument queue specifies the queue name. The application can still put messages in other queues by specifying the destination queue name in the header, or get from other queues by specifying the queue name in the selector.
TODO: document options
-
:expires
-
:priority
-
:max_retries
-
:selector
-
:drb_uri
-
:tx_timeout
-
:connect_count
:call-seq:
Queue.new([name [,]]) -> queue
101 102 103 104 105 106 |
# File 'lib/reliable-msg/queue.rb', line 101 def initialize queue = nil, = nil .each do |name, value| instance_variable_set "@#{name.to_s}".to_sym, value end if @queue = queue end |
Class Method Details
.selector(&block) ⇒ Object
Create and return a new selector based on the block expression. For example:
selector = Queue.selector { priority >= 2 and received > Time.new.to_i - 60 }
:call-seq:
Queue.selector { ... } -> selector
404 405 406 407 |
# File 'lib/reliable-msg/queue.rb', line 404 def self.selector &block raise ArgumentError, ERROR_NO_SELECTOR_BLOCK unless block Selector.new &block end |
Instance Method Details
#connect_count ⇒ Object
Returns the number of connection attempts, before operations fail.
:call-seq:
queue.connect_count -> numeric
338 339 340 |
# File 'lib/reliable-msg/queue.rb', line 338 def connect_count @connect_count || DEFAULT_CONNECT_RETRY end |
#connect_count=(count) ⇒ Object
Sets the number of connection attempts, before operations fail. The minimum is one. Use nil to restore the default connection count.
:call-seq:
queue.connect_count = count
queue.connect_count = nil
349 350 351 352 353 354 355 356 |
# File 'lib/reliable-msg/queue.rb', line 349 def connect_count= count if count raise ArgumentError, ERROR_INVALID_CONNECT_COUNT unless count.instance_of?(Integer) and count > 0 @connect_count = count else @connect_count = nil end end |
#get(selector = nil, &block) ⇒ Object
Get a message from the queue.
Call with no arguments to retrieve the next message in the queue. Call with a message identifier to retrieve that message. Call with selectors to retrieve the first message that matches.
Selectors specify which headers to match. For example, to retrieve all messages in the queue ‘my-queue’ with priority 2:
msg = queue.get :queue=>'my-queue', :priority=>2
To put and get the same message:
mid = queue.put obj
msg = queue.get mid # or queue.get :id=>mid
assert(msg.obj == obj)
More complex selector expressions can be generated using Queue.selector. For example, to retrieve the next message with priority 2 or higher, received in the last 60 seconds:
selector = Queue.selector { priority >= 2 and received > Time.new.to_i - 60 }
msg = queue.get selector
You can also specify selectors for a Queue to be used by default for all Queue.get calls on that Queue object. For example:
queue.selector= { priority >= 2 and received > Time.new.to_i - 60 }
msg = queue.get # default selector applies
The following headers have special meaning:
-
:id– The message identifier. -
:queue– Select a message originally delivered to the named queue. Only used when retrieving messages from the dead-letter queue. -
:retry– Specifies the retry count for the message. Zero when the message is first delivered, and incremented after each re-delivery attempt. -
:created– Indicates timestamp (in seconds) when the message was created. -
:received– Indicates timestamp (in seconds) when the message was received. -
:expires_at– Indicates timestamp (in seconds) when the message will expire,nilif the message does not expire.
Call this method without a block to return the message. The returned object is of type Message, or nil if no message is found.
Call this method in a block to retrieve and process the message. The block is called with the Message object, returning the result of the block. Returns nil if no message is found.
All operations performed on the queue inside the block are part of the same transaction. The transaction commits when the block completes. However, if the block raises an exception, the transaction aborts: the message along with any message retrieved through that Queue object are returned to the queue; messages put through that Queue object are discarded. You cannot put and get the same message inside a transaction.
For example:
queue.put obj
while queue.get do |msg| # called for each message in the queue,
# until the queue is empty
... do something with msg ...
queue.put obj # puts another message in the queue
true
end
This loop will only complete if it raises an exception, since it gets one message from the queue and puts another message in its place. After an exception, there will be at least one message in the queue.
Each attempt to process a message increases its retry count. When the retry count (:retry) reaches the maximum allowed (:max_retry), the message is moved to the dead-letter queue.
This method does not block and returns immediately if there is no message in the queue. To continue processing all messages in the queue:
while true # repeat forever
while true
break unless queue.get do |msg|
... do something with msg ...
true
end
end
sleep 5 # no messages, wait
end
:call-seq:
queue.get([selector]) -> msg or nil
queue.get([selector]) {|msg| ... } -> obj
See: Message
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/reliable-msg/queue.rb', line 255 def get selector = nil, &block tx = old_tx = Thread.current[THREAD_CURRENT_TX] # If block, begin a new transaction. if block tx = {:qm=>qm} tx[:tid] = tx[:qm].begin tx_timeout Thread.current[THREAD_CURRENT_TX] = tx end result = begin # Validate the selector: nil, string or hash. selector = case selector when String {:id=>selector} when Hash, Array, Selector selector when nil @selector else raise ArgumentError, ERROR_INVALID_SELECTOR end # If inside a transaction, always retrieve from the same queue manager, # otherwise, allow repeated() to try and access multiple queue managers. = if tx tx[:qm].enqueue :queue=>@queue, :selector=>selector, :tid=>tx[:tid] else repeated { |qm| qm.enqueue :queue=>@queue, :selector=>selector } end # Result is either message, or result from processing block. Note that # calling block may raise an exception. We deserialize the message here # for two reasons: # 1. It creates a distinct copy, so changing the message object and returning # it to the queue (abort) does not affect other consumers. # 2. The message may rely on classes known to the client but not available # to the queue manager. result = if = Message.new([:id], [:headers], Marshal::load([:message])) block ? block.call() : end rescue Exception=>error # Abort the transaction if we started it. Propagate error. qm.abort(tx[:tid]) if block raise error ensure # Resume the old transaction. Thread.current[THREAD_CURRENT_TX] = old_tx if block end # Commit the transaction and return the result. We do this outside the main # block, since we don't abort in case of error (commit is one-phase) and we # don't retain the transaction association, it completes by definition. qm.commit(tx[:tid]) if block result end |
#put(message, headers = nil) ⇒ Object
Put a message in the queue.
The message argument is required, but may be nil
Headers are optional. Headers are used to provide the application with additional information about the message, and can be used to retrieve messages (see Queue.put for discussion of selectors). Some headers are used to handle message processing internally (e.g. :priority, :expires).
Each header uses a symbol for its name. The value may be string, numeric, true/false or nil. No other objects are allowed. To improve performance, keep headers as small as possible.
The following headers have special meaning:
-
:delivery– The message delivery mode. -
:queue– Puts the message in the named queue. Otherwise, uses the queue specified when creating the Queue object. -
:priority– The message priority. Messages with higher priority are retrieved first. -
:expires– Message expiration in seconds. Messages do not expire unless specified. Zero ornilmeans no expiration. -
:expires_at– Specifies when the message expires (timestamp). Alternative to:expires. -
:max_retries– Maximum number of attempts to re-deliver message, afterwhich message moves to the DLQ. Minimum is 0 (deliver only once), default is 4 (deliver up to 5 times).
Headers can be set on a per-queue basis when the Queue is created. This only affects messages put through that Queue object.
Messages can be delivered using one of three delivery modes:
-
:best_effort– Attempt to deliver the message once. If the message expires or cannot be delivered, discard the message. The is the default delivery mode. -
:repeated– Attempt to deliver until message expires, or up to maximum re-delivery count (see:max_retries). Afterwards, move message to dead-letter queue. -
:once– Attempt to deliver message exactly once. If message expires, or first delivery attempt fails, move message to dead-letter queue.
For example:
queue.put request
queue.put notice, :expires=>10
queue.put object, :queue=>'other-queue'
:call-seq:
queue.put([, headers]) -> id
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/reliable-msg/queue.rb', line 155 def put , headers = nil tx = Thread.current[THREAD_CURRENT_TX] # Use headers supplied by callers, or defaults for this queue. headers ||= {} headers.fetch(:priority, @priority || 0) headers.fetch(:expires, @expires) headers.fetch(:max_retries, @max_retries || DEFAULT_MAX_RETRIES) # Serialize the message before sending to queue manager. We need the # message to be serialized for storage, this just saves duplicate # serialization when using DRb. = Marshal::dump # If inside a transaction, always send to the same queue manager, otherwise, # allow repeated() to try and access multiple queue managers. if tx return tx[:qm].queue(:message=>, :headers=>headers, :queue=>(headers[:queue] || @queue), :tid=>tx[:tid]) else return repeated { |qm| qm.queue :message=>, :headers=>headers, :queue=>(headers[:queue] || @queue) } end end |
#selector(&block) ⇒ Object
If called with no block, returns the selector associated with this Queue (see Queue.selector=). If called with a block, creates and returns a new selector (similar to Queue::selector).
:call-seq:
queue.selector -> selector
queue.selector { ... } -> selector
366 367 368 |
# File 'lib/reliable-msg/queue.rb', line 366 def selector &block block ? Selector.new(&block) : @selector end |
#selector=(value = nil, &block) ⇒ Object
Sets a default selector for this Queue. Affects all calls to Queue.get on this Queue object that do not specify a selector.
You can pass a Selector object, a block expression, or nil if you no longer want to use the default selector. For example:
queue.selector= { priority >= 2 and received > Time.new.to_i - 60 }
10.times do
p queue.get
end
queue.selector= nil
:call-seq:
queue.selector = selector
queue.selector = { ... }
queue.selector = nil
386 387 388 389 390 391 392 393 394 395 396 |
# File 'lib/reliable-msg/queue.rb', line 386 def selector= value = nil, &block raise ArgumentError, ERROR_SELECTOR_VALUE_OR_BLOCK if (value && block) if value raise ArgumentError, ERROR_SELECTOR_VALUE_OR_BLOCK unless value.instance_of?(Selector) @selector = value elsif block @selector = Selector.new &block else @selector = nil end end |
#tx_timeout ⇒ Object
Returns the transaction timeout (in seconds).
:call-seq:
queue.tx_timeout -> numeric
313 314 315 |
# File 'lib/reliable-msg/queue.rb', line 313 def tx_timeout @tx_timeout || DEFAULT_TX_TIMEOUT end |
#tx_timeout=(timeout) ⇒ Object
Sets the transaction timeout (in seconds). Affects future transactions started by Queue.get. Use nil to restore the default timeout.
:call-seq:
queue.tx_timeout = timeout
queue.tx_timeout = nil
324 325 326 327 328 329 330 331 |
# File 'lib/reliable-msg/queue.rb', line 324 def tx_timeout= timeout if timeout raise ArgumentError, ERROR_INVALID_TX_TIMEOUT unless timeout.instance_of?(Integer) and timeout > 0 @tx_timeout = timeout else @tx_timeout = nil end end |