Class: Starling

Inherits:
MemCache
  • Object
show all
Defined in:
lib/starling.rb

Constant Summary collapse

WAIT_TIME =
0.25

Instance Method Summary collapse

Instance Method Details

#available_queues(statistics = nil) ⇒ Object

returns a list of available (currently allocated) queues.



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/starling.rb', line 64

def available_queues(statistics = nil)
  statistics ||= stats

  statistics.map { |k,v|
    v.keys
  }.flatten.uniq.grep(/^queue_(.*)_items/).map { |v|
    v.gsub(/^queue_/, '').gsub(/_items$/, '')
  }.reject { |v|
    v =~ /_total$/ || v =~ /_expired$/
  }
end

#flush(queue) ⇒ Object

iterator to flush queue. Each element will be passed to the provided block



80
81
82
83
84
85
# File 'lib/starling.rb', line 80

def flush(queue)
  sizeof(queue).times do
    v = get(queue)
    yield v if block_given?
  end
end

#get(*args) ⇒ Object

fetch an item from a queue.



10
11
12
13
14
15
16
# File 'lib/starling.rb', line 10

def get(*args)
  loop do
    response = super(*args)
    return response unless response.nil?
    sleep WAIT_TIME
  end
end

#set(queue, value, expiry = 0, raw = false) ⇒ Object

insert value into queue.

expiry is expressed as a UNIX timestamp

If raw is true, value will not be Marshalled. If raw = :yaml, value will be serialized with YAML, instead.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/starling.rb', line 26

def set(queue, value, expiry = 0, raw = false)
  retries = 0
  begin
    if raw == :yaml
      value = YAML.dump(value)
      raw = true
    end

    super(queue, value, expiry, raw)
  rescue MemCache::MemCacheError => e
    retries += 1
    sleep WAIT_TIME
    retry unless retries > 3
    raise e
  end
end

#sizeof(queue, statistics = nil) ⇒ Object

returns the number of items in queue. If queue is :all, a hash of all queue sizes will be returned.



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/starling.rb', line 47

def sizeof(queue, statistics = nil)
  statistics ||= stats

  if queue == :all
    queue_sizes = {}
    available_queues(statistics).each do |queue|
      queue_sizes[queue] = sizeof(queue, statistics)
    end
    return queue_sizes
  end

  statistics.inject(0) { |m,(k,v)| m + v["queue_#{queue}_items"].to_i }
end