Class: Qup::Adapter::Kestrel::Queue

Inherits:
Destination show all
Includes:
QueueAPI
Defined in:
lib/qup/adapter/kestrel/queue.rb

Overview

Internal: The Implementation of Queue in the Kestrel Adapter

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QueueAPI

#consumer, #producer

Methods inherited from Destination

#ping

Constructor Details

#initialize(client, name) ⇒ Queue

Internal: Create a new Queue

address - the Connection Address string for the Kestrel Client name - the String name of the Topic

Returns a new Queue



16
17
18
19
# File 'lib/qup/adapter/kestrel/queue.rb', line 16

def initialize( client, name )
  super( client, name )
  @open_messages = {}
end

Instance Attribute Details

#nameObject (readonly)

Internal: The name of the Queue



22
23
24
# File 'lib/qup/adapter/kestrel/queue.rb', line 22

def name
  @name
end

Instance Method Details

#acknowledge(message) ⇒ Object

Internal: Acknowledge that message is completed and remove it from the Queue.

For Kestrel, this really just closes the last message, the message that is sent in does not matter.

Returns nothing

Raises:



87
88
89
90
91
# File 'lib/qup/adapter/kestrel/queue.rb', line 87

def acknowledge( message )
  open_msg = @open_messages.delete( message.key )
  raise Qup::Error, "Message #{message.key} is not currently being consumed" unless open_msg
  @client.close( @name )
end

#consume(&block) ⇒ Object

Internal: Retrieve a Message from the Queue

Yields a Message

A user of the Qup API should use a Consumer instance to retrieve items from the Queue.

Returns a Message or nil if no message was on the queue



68
69
70
71
72
73
74
75
76
77
# File 'lib/qup/adapter/kestrel/queue.rb', line 68

def consume(&block)
  q_item = @client.reserve( @name )
  return nil unless q_item
  q_message = ::Qup::Message.new( q_item.object_id, unmarshal_if_marshalled( q_item ))
  @open_messages[q_message.key] = q_item
  if block_given? then
    yield_message( q_message, &block )
  end
  return q_message
end

#depthObject

Internal: return the number of Messages on the Queue

Returns an integer of the Queue depth



41
42
43
# File 'lib/qup/adapter/kestrel/queue.rb', line 41

def depth
  @client.stats['queues'][@name]['items']
end

#destroyObject

Internal: Remove the Queue if possible

Returns nothing



34
35
36
# File 'lib/qup/adapter/kestrel/queue.rb', line 34

def destroy
  @client.delete(@name)
end

#flushObject

Internal: Remove all messages from the Queue

Returns nothing.



27
28
29
# File 'lib/qup/adapter/kestrel/queue.rb', line 27

def flush
  @client.flush(@name)
end

#produce(message) ⇒ Object

Internal: Put an item onto the Queue

message - the data to put onto the queue.

A user of the Qup API should use a Producer instance to put items onto the queue.

Returns the Message that was put onto the Queue



54
55
56
57
# File 'lib/qup/adapter/kestrel/queue.rb', line 54

def produce( message )
  @client.set( @name, message )
  return ::Qup::Message.new( message.object_id, message )
end