Class: Qup::Adapter::Kestrel::Queue
- Inherits:
-
Destination
- Object
- Destination
- Qup::Adapter::Kestrel::Queue
- Includes:
- QueueAPI
- Defined in:
- lib/qup/adapter/kestrel/queue.rb
Overview
Internal: The Implementation of Queue in the Kestrel Adapter
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Internal: The name of the Queue.
Instance Method Summary collapse
-
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
-
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue.
-
#depth ⇒ Object
Internal: return the number of Messages on the Queue.
-
#flush ⇒ Object
Internal: Remove all messages from the Queue.
-
#initialize(client, name) ⇒ Queue
constructor
Internal: Create a new Queue.
-
#produce(message) ⇒ Object
Internal: Put an item onto the Queue.
Methods included from QueueAPI
#consumer, #destroy, #producer
Methods inherited from Destination
Constructor Details
#initialize(client, name) ⇒ Queue
Internal: Create a new Queue
address - the Connection Address string for the Kestrel Client name - the String name of the Topic
Returns a new Queue
16 17 18 19 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 16 def initialize( client, name ) super( client, name ) @open_messages = {} end |
Instance Attribute Details
#name ⇒ Object (readonly)
Internal: The name of the Queue
22 23 24 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 22 def name @name end |
Instance Method Details
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
For Kestrel, this really just closes the last message, the message that is sent in does not matter.
Returns nothing
81 82 83 84 85 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 81 def acknowledge( ) open_msg = @open_messages.delete( .key ) raise Qup::Error, "Message #{.key} is not currently being consumed" unless open_msg @client.close( @name ) end |
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue
Yields a Message
A user of the Qup API should use a Consumer instance to retrieve items from the Queue.
Returns a Message or nil if no message was on the queue
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 62 def consume(&block) q_item = @client.reserve( @name ) return nil unless q_item = ::Qup::Message.new( q_item.object_id, unmarshal_if_marshalled( q_item )) @open_messages[.key] = q_item if block_given? then ( , &block ) end return end |
#depth ⇒ Object
Internal: return the number of Messages on the Queue
Returns an integer of the Queue depth
35 36 37 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 35 def depth @client.stats['queues'][@name]['items'] end |
#flush ⇒ Object
Internal: Remove all messages from the Queue
Returns nothing.
27 28 29 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 27 def flush @client.flush(@name) end |
#produce(message) ⇒ Object
Internal: Put an item onto the Queue
message - the data to put onto the queue.
A user of the Qup API should use a Producer instance to put items onto the queue.
Returns the Message that was put onto the Queue
48 49 50 51 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 48 def produce( ) @client.set( @name, ) return ::Qup::Message.new( .object_id, ) end |