Class: Qup::Adapter::Redis::Queue
- Inherits:
-
Connection
- Object
- Connection
- Qup::Adapter::Redis::Queue
- Includes:
- QueueAPI
- Defined in:
- lib/qup/adapter/redis/queue.rb
Overview
Internal: The Qup implementation for a Redis Queue
Instance Attribute Summary
Attributes inherited from Connection
Instance Method Summary collapse
-
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
-
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue.
-
#depth ⇒ Object
Internal: return the number of Messages on the Queue.
-
#destroy ⇒ Object
(also: #flush)
Internal: Destroy the queue.
-
#initialize(uri, name, topic_name = nil) ⇒ Queue
constructor
Internal: create a new Queue.
-
#produce(message) ⇒ Object
Internal: Put an item onto the Queue.
Methods included from QueueAPI
Constructor Details
#initialize(uri, name, topic_name = nil) ⇒ Queue
Internal: create a new Queue
uri - the connection uri for the Redis Client name - the String name of the Queue topic_name - (optional) the String name of a parent topic
Returns a new Queue.
17 18 19 20 21 |
# File 'lib/qup/adapter/redis/queue.rb', line 17 def initialize( uri, name, topic_name = nil ) super uri, name @topic_name = topic_name @open_messages = {} end |
Instance Method Details
#acknowledge(message) ⇒ Object
Internal: Acknowledge that message is completed and remove it from the Queue.
In redis, this doesn’t do anything at all. The tracking is only performed to meet the API requirements.
Returns nothing.
55 56 57 58 |
# File 'lib/qup/adapter/redis/queue.rb', line 55 def acknowledge( ) open_msg = @open_messages.delete( .key ) raise Qup::Error, "Message #{.key} is not currently being consumed" unless open_msg end |
#consume(&block) ⇒ Object
Internal: Retrieve a Message from the Queue
Yields a Message
Returns a Message
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/qup/adapter/redis/queue.rb', line 78 def consume(&block) data = @client.rpop( name ) return if data.nil? = ::Qup::Message.new( data.object_id, data ) @open_messages[.key] = if block_given? then ( , &block ) else return end end |
#depth ⇒ Object
Internal: return the number of Messages on the Queue
Returns an integer of the Queue depth
44 45 46 |
# File 'lib/qup/adapter/redis/queue.rb', line 44 def depth @client.llen name end |
#destroy ⇒ Object Also known as: flush
Internal: Destroy the queue
Removes the list from redis.
Returns nothing.
28 29 30 31 |
# File 'lib/qup/adapter/redis/queue.rb', line 28 def destroy @client.del name @client.srem @topic_name, name if @topic_name end |
#produce(message) ⇒ Object
Internal: Put an item onto the Queue
message - the data to put onto the queue.
The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.
Returns the Message that was put onto the Queue
68 69 70 71 |
# File 'lib/qup/adapter/redis/queue.rb', line 68 def produce( ) @client.lpush name, return ::Qup::Message.new( .object_id, ) end |