Class: Kestrel::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/kestrelrb.rb

Defined Under Namespace

Classes: Unsubscribe

Constant Summary collapse

SLEEP_DELAY =

Seconds to sleep when no messages in queue and GET timeout is 0

0.001

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, *args) ⇒ Queue

Returns a new instance of Queue.



12
13
14
15
16
17
18
19
# File 'lib/kestrelrb.rb', line 12

def initialize(queue_name, *args)
  opts = args.last.is_a?(Hash) ? args.pop : {}
  @options = validate_queue_options(opts)
  @queue_name = queue_name
  servers = args.empty? ? ['localhost:22133'] : args
  # Instantiate Memcache client with the specified servers and a timeout of 2x the GET timeout
  @kestrel = MemCache.new(servers, :timeout => timeout * 2)
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



10
11
12
# File 'lib/kestrelrb.rb', line 10

def options
  @options
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



10
11
12
# File 'lib/kestrelrb.rb', line 10

def queue_name
  @queue_name
end

Instance Method Details

#abortObject



73
74
75
# File 'lib/kestrelrb.rb', line 73

def abort
  @kestrel.get(abort_key)
end

#dequeue(raw = false) ⇒ Object Also known as: get



64
65
66
# File 'lib/kestrelrb.rb', line 64

def dequeue(raw = false)
  @kestrel.get(get_key, raw)
end

#dropObject



21
22
23
# File 'lib/kestrelrb.rb', line 21

def drop
  @kestrel.delete(queue_name)
end

#enqueue(v) ⇒ Object



60
61
62
# File 'lib/kestrelrb.rb', line 60

def enqueue(v)
  @kestrel.set(queue_name, v)
end

#flushObject

Raises:

  • (NotImplementedError)


25
26
27
28
# File 'lib/kestrelrb.rb', line 25

def flush
  raise NotImplementedError, "FLUSH command is not yet supported."
  # @kestrel.flush
end

#flush_allObject



30
31
32
# File 'lib/kestrelrb.rb', line 30

def flush_all
  @kestrel.flush_all
end

#on_error(&block) ⇒ Object



38
39
40
# File 'lib/kestrelrb.rb', line 38

def on_error(&block)
  (@error_handlers ||= []) << block
end

#peek(raw = false) ⇒ Object



69
70
71
# File 'lib/kestrelrb.rb', line 69

def peek(raw = false)
  @kestrel.get(peek_key, raw)
end

#reliable?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/kestrelrb.rb', line 81

def reliable?
  options[:reliable]
end

#statsObject



34
35
36
# File 'lib/kestrelrb.rb', line 34

def stats
  @kestrel.stats
end

#subscribe(&block) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/kestrelrb.rb', line 42

def subscribe(&block)
  loop do
    msg = nil
    begin
      if msg = get
        block.call(msg)
      else
        sleep SLEEP_DELAY if timeout == 0
      end
    rescue Unsubscribe => e
      break
    rescue StandardError => e
      abort if reliable?
      @error_handlers.each{ |b| b.call(e, msg) } if @error_handlers
    end
  end
end

#timeoutObject



77
78
79
# File 'lib/kestrelrb.rb', line 77

def timeout
  options[:timeout]
end