Class: Qup::Adapter::Kestrel::Queue

Inherits:
Destination show all
Includes:
QueueAPI
Defined in:
lib/qup/adapter/kestrel/queue.rb

Overview

Internal: The Implementation of Queue in the Kestrel Adapter

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QueueAPI

#consumer, #destroy, #producer

Methods inherited from Destination

#destroy, #ping

Constructor Details

#initialize(client, name) ⇒ Queue

Internal: Create a new Queue

address - the Connection Address string for the Kestrel Client name - the String name of the Topic

Returns a new Queue



16
17
18
19
# File 'lib/qup/adapter/kestrel/queue.rb', line 16

def initialize( client, name )
  super( client, name )
  @open_messages = {}
end

Instance Attribute Details

#nameObject (readonly)

Internal: The name of the Queue



22
23
24
# File 'lib/qup/adapter/kestrel/queue.rb', line 22

def name
  @name
end

Instance Method Details

#acknowledge(message) ⇒ Object

Internal: Acknowledge that message is completed and remove it from the Queue.

For Kestrel, this really just closes the last message, the message that is sent in does not matter.

Returns nothing

Raises:



81
82
83
84
85
# File 'lib/qup/adapter/kestrel/queue.rb', line 81

def acknowledge( message )
  open_msg = @open_messages.delete( message.key )
  raise Qup::Error, "Message #{message.key} is not currently being consumed" unless open_msg
  @client.close( @name )
end

#consume(&block) ⇒ Object

Internal: Retrieve a Message from the Queue

Yields a Message

A user of the Qup API should use a Consumer instance to retrieve items from the Queue.

Returns a Message or nil if no message was on the queue



62
63
64
65
66
67
68
69
70
71
# File 'lib/qup/adapter/kestrel/queue.rb', line 62

def consume(&block)
  q_item = @client.reserve( @name )
  return nil unless q_item
  q_message = ::Qup::Message.new( q_item.object_id, unmarshal_if_marshalled( q_item ))
  @open_messages[q_message.key] = q_item
  if block_given? then
    yield_message( q_message, &block )
  end
  return q_message
end

#depthObject

Internal: return the number of Messages on the Queue

Returns an integer of the Queue depth



35
36
37
# File 'lib/qup/adapter/kestrel/queue.rb', line 35

def depth
  @client.stats['queues'][@name]['items']
end

#flushObject

Internal: Remove all messages from the Queue

Returns nothing.



27
28
29
# File 'lib/qup/adapter/kestrel/queue.rb', line 27

def flush
  @client.flush(@name)
end

#produce(message) ⇒ Object

Internal: Put an item onto the Queue

message - the data to put onto the queue.

A user of the Qup API should use a Producer instance to put items onto the queue.

Returns the Message that was put onto the Queue



48
49
50
51
# File 'lib/qup/adapter/kestrel/queue.rb', line 48

def produce( message )
  @client.set( @name, message )
  return ::Qup::Message.new( message.object_id, message )
end