Module: Resque::DynamicQueues

Defined in:
lib/resque/dynamic_queues.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(receiver) ⇒ Object



66
67
68
69
70
71
# File 'lib/resque/dynamic_queues.rb', line 66

def self.included(receiver)
  receiver.class_eval do
    alias queues_without_dynamic queues
    alias queues queues_with_dynamic
  end
end

Instance Method Details

#queues_with_dynamicObject

Returns a list of queues to use when searching for a job.

A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.

The splat can also be used as a wildcard within a queue name, e.g. “high”, and negation can be indicated with a prefix of “!”

An @key can be used to dynamically look up the queue list for key from redis. If no key is supplied, it defaults to the worker’s hostname, and wildcards and negations can be used inside this dynamic queue list. Set the queue list for a key with Resque.set_dynamic_queue(key, [“q1”, “q2”]



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/resque/dynamic_queues.rb', line 16

def queues_with_dynamic
  queue_names = @queues.dup
  
  return queues_without_dynamic if queue_names.grep(/(^!)|(^@)|(\*)/).size == 0

  real_queues = Resque.queues
  matched_queues = []

  #Remove Queues under Api Limits
  api_limit_instances = Redis.current.keys("APILimits:*").map {|key| key.split('APILimits:').last.to_i}
  real_queues = real_queues.select {|key| key if !api_limit_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)}
  
  #Queue Pausing 
  paused_instances = Redis.current.keys("resque:PauseQueue:*").map {|key| key.split('resque:PauseQueue:').last.to_i}
  real_queues = real_queues.select {|key| key if !paused_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)}

  while q = queue_names.shift
    q = q.to_s

    if q =~ /^(!)?@(.*)/
      key = $2.strip
      key = hostname if key.size == 0

      add_queues = Resque.get_dynamic_queue(key)
      add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1

      queue_names.concat(add_queues)
      next
    end

    if q =~ /^!/
      negated = true
      q = q[1..-1]
    end

    patstr = q.gsub(/\*/, ".*")
    pattern = /^#{patstr}$/
    if negated
      matched_queues -= matched_queues.grep(pattern)
    else
      matches = real_queues.grep(/^#{pattern}$/)
      matches = [q] if matches.size == 0 && q == patstr
      matched_queues.concat(matches.sort)
    end
  end
 
  return matched_queues.uniq
end