Class: Qup::Adapter::Kestrel::Queue
- Inherits:
-
Destination
- Object
- Destination
- Qup::Adapter::Kestrel::Queue
- Includes:
- QueueAPI
- Defined in:
- lib/qup/adapter/kestrel/queue.rb
Overview
Internal: The Implementation of Queue in the Kestrel Adapter
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Internal: The name of the Queue.
Instance Method Summary collapse
-
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
-
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue.
-
#depth ⇒ Object
Internal: return the number of Messages on the Queue.
-
#destroy ⇒ Object
Internal: Remove the Queue if possible.
-
#flush ⇒ Object
Internal: Remove all messages from the Queue.
-
#initialize(client, name) ⇒ Queue
constructor
Internal: Create a new Queue.
-
#produce(message) ⇒ Object
Internal: Put an item onto the Queue.
Methods included from QueueAPI
Methods inherited from Destination
Constructor Details
#initialize(client, name) ⇒ Queue
Internal: Create a new Queue
address - the Connection Address string for the Kestrel Client name - the String name of the Topic
Returns a new Queue
16 17 18 19 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 16 def initialize( client, name ) super( client, name ) @open_messages = {} end |
Instance Attribute Details
#name ⇒ Object (readonly)
Internal: The name of the Queue
22 23 24 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 22 def name @name end |
Instance Method Details
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
For Kestrel, this really just closes the last message, the message that is sent in does not matter.
Returns nothing
87 88 89 90 91 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 87 def acknowledge( ) open_msg = @open_messages.delete( .key ) raise Qup::Error, "Message #{.key} is not currently being consumed" unless open_msg @client.close( @name ) end |
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue
Yields a Message
A user of the Qup API should use a Consumer instance to retrieve items from the Queue.
Returns a Message or nil if no message was on the queue
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 68 def consume(&block) q_item = @client.reserve( @name ) return nil unless q_item = ::Qup::Message.new( q_item.object_id, unmarshal_if_marshalled( q_item )) @open_messages[.key] = q_item if block_given? then ( , &block ) end return end |
#depth ⇒ Object
Internal: return the number of Messages on the Queue
Returns an integer of the Queue depth
41 42 43 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 41 def depth @client.stats['queues'][@name]['items'] end |
#destroy ⇒ Object
Internal: Remove the Queue if possible
Returns nothing
34 35 36 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 34 def destroy @client.delete(@name) end |
#flush ⇒ Object
Internal: Remove all messages from the Queue
Returns nothing.
27 28 29 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 27 def flush @client.flush(@name) end |
#produce(message) ⇒ Object
Internal: Put an item onto the Queue
message - the data to put onto the queue.
A user of the Qup API should use a Producer instance to put items onto the queue.
Returns the Message that was put onto the Queue
54 55 56 57 |
# File 'lib/qup/adapter/kestrel/queue.rb', line 54 def produce( ) @client.set( @name, ) return ::Qup::Message.new( .object_id, ) end |