Class: Qup::Adapter::Redis::Queue

Inherits:
Connection show all
Includes:
QueueAPI
Defined in:
lib/qup/adapter/redis/queue.rb

Overview

Internal: The Qup implementation for a Redis Queue

Instance Attribute Summary

Attributes inherited from Connection

#name

Instance Method Summary collapse

Methods included from QueueAPI

#consumer, #name, #producer

Constructor Details

#initialize(uri, name, topic_name = nil) ⇒ Queue

Internal: create a new Queue

uri - the connection uri for the Redis Client name - the String name of the Queue topic_name - (optional) the String name of a parent topic

Returns a new Queue.



17
18
19
20
21
# File 'lib/qup/adapter/redis/queue.rb', line 17

def initialize( uri, name, topic_name = nil )
  super uri, name
  @topic_name = topic_name
  @open_messages = {}
end

Instance Method Details

#acknowledge(message) ⇒ Object

Internal: Acknowledge that message is completed and remove it from the Queue.

In redis, this doesn’t do anything at all. The tracking is only performed to meet the API requirements.

Returns nothing.

Raises:



55
56
57
58
# File 'lib/qup/adapter/redis/queue.rb', line 55

def acknowledge( message )
  open_msg = @open_messages.delete( message.key )
  raise Qup::Error, "Message #{message.key} is not currently being consumed" unless open_msg
end

#consume(&block) ⇒ Object

Internal: Retrieve a Message from the Queue

Yields a Message

Returns a Message



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/qup/adapter/redis/queue.rb', line 78

def consume(&block)
  data = @client.rpop( name )
  return if data.nil?
  message = ::Qup::Message.new( data.object_id, data )
  @open_messages[message.key] = message
  if block_given? then
    yield_message( message, &block )
  else
    return message
  end
end

#depthObject

Internal: return the number of Messages on the Queue

Returns an integer of the Queue depth



44
45
46
# File 'lib/qup/adapter/redis/queue.rb', line 44

def depth
  @client.llen name
end

#destroyObject Also known as: flush

Internal: Destroy the queue

Removes the list from redis.

Returns nothing.



28
29
30
31
# File 'lib/qup/adapter/redis/queue.rb', line 28

def destroy
  @client.del name
  @client.srem @topic_name, name if @topic_name
end

#produce(message) ⇒ Object

Internal: Put an item onto the Queue

message - the data to put onto the queue.

The ‘message’ that is passed in is wrapped in a Qup::Message before being stored.

Returns the Message that was put onto the Queue



68
69
70
71
# File 'lib/qup/adapter/redis/queue.rb', line 68

def produce( message )
  @client.lpush name, message
  return ::Qup::Message.new( message.object_id, message )
end