Class: Kestrel::Queue
- Inherits:
-
Object
- Object
- Kestrel::Queue
- Defined in:
- lib/kestrelrb.rb
Defined Under Namespace
Classes: Unsubscribe
Constant Summary collapse
- SLEEP_DELAY =
Seconds to sleep when no messages in queue and GET timeout is 0
0.001
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
Instance Method Summary collapse
- #abort ⇒ Object
- #dequeue(raw = false) ⇒ Object (also: #get)
- #drop ⇒ Object
- #enqueue(v) ⇒ Object
- #flush ⇒ Object
- #flush_all ⇒ Object
-
#initialize(queue_name, *args) ⇒ Queue
constructor
A new instance of Queue.
- #on_error(&block) ⇒ Object
- #peek(raw = false) ⇒ Object
- #reliable? ⇒ Boolean
- #stats ⇒ Object
- #subscribe(&block) ⇒ Object
- #timeout ⇒ Object
Constructor Details
#initialize(queue_name, *args) ⇒ Queue
Returns a new instance of Queue.
12 13 14 15 16 17 18 19 |
# File 'lib/kestrelrb.rb', line 12 def initialize(queue_name, *args) opts = args.last.is_a?(Hash) ? args.pop : {} @options = (opts) @queue_name = queue_name servers = args.empty? ? ['localhost:22133'] : args # Instantiate Memcache client with the specified servers and a timeout of 2x the GET timeout @kestrel = MemCache.new(servers, :timeout => timeout * 2) end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
10 11 12 |
# File 'lib/kestrelrb.rb', line 10 def @options end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
10 11 12 |
# File 'lib/kestrelrb.rb', line 10 def queue_name @queue_name end |
Instance Method Details
#abort ⇒ Object
73 74 75 |
# File 'lib/kestrelrb.rb', line 73 def abort @kestrel.get(abort_key) end |
#dequeue(raw = false) ⇒ Object Also known as: get
64 65 66 |
# File 'lib/kestrelrb.rb', line 64 def dequeue(raw = false) @kestrel.get(get_key, raw) end |
#drop ⇒ Object
21 22 23 |
# File 'lib/kestrelrb.rb', line 21 def drop @kestrel.delete(queue_name) end |
#enqueue(v) ⇒ Object
60 61 62 |
# File 'lib/kestrelrb.rb', line 60 def enqueue(v) @kestrel.set(queue_name, v) end |
#flush ⇒ Object
25 26 27 28 |
# File 'lib/kestrelrb.rb', line 25 def flush raise NotImplementedError, "FLUSH command is not yet supported." # @kestrel.flush end |
#flush_all ⇒ Object
30 31 32 |
# File 'lib/kestrelrb.rb', line 30 def flush_all @kestrel.flush_all end |
#on_error(&block) ⇒ Object
38 39 40 |
# File 'lib/kestrelrb.rb', line 38 def on_error(&block) (@error_handlers ||= []) << block end |
#peek(raw = false) ⇒ Object
69 70 71 |
# File 'lib/kestrelrb.rb', line 69 def peek(raw = false) @kestrel.get(peek_key, raw) end |
#reliable? ⇒ Boolean
81 82 83 |
# File 'lib/kestrelrb.rb', line 81 def reliable? [:reliable] end |
#stats ⇒ Object
34 35 36 |
# File 'lib/kestrelrb.rb', line 34 def stats @kestrel.stats end |
#subscribe(&block) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/kestrelrb.rb', line 42 def subscribe(&block) loop do msg = nil begin if msg = get block.call(msg) else sleep SLEEP_DELAY if timeout == 0 end rescue Unsubscribe => e break rescue StandardError => e abort if reliable? @error_handlers.each{ |b| b.call(e, msg) } if @error_handlers end end end |
#timeout ⇒ Object
77 78 79 |
# File 'lib/kestrelrb.rb', line 77 def timeout [:timeout] end |