Module: Qmore::Attributes

Extended by:
Attributes
Included in:
Attributes, JobReserver
Defined in:
lib/qmore/attributes.rb

Instance Method Summary collapse

Instance Method Details

#expand_queues(queue_patterns, real_queues) ⇒ Object

Returns a list of queues to use when searching for a job.

A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.

The splat can also be used as a wildcard within a queue name, e.g. “high”, and negation can be indicated with a prefix of “!”

An @key can be used to dynamically look up the queue list for key from redis. If no key is supplied, it defaults to the worker’s hostname, and wildcards and negations can be used inside this dynamic queue list. Set the queue list for a key with set_dynamic_queue(key, [“q1”, “q2”]



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/qmore/attributes.rb', line 20

def expand_queues(queue_patterns, real_queues)
  queue_patterns = queue_patterns.dup
  real_queues = real_queues.dup

  matched_queues = []

  while q = queue_patterns.shift
    q = q.to_s

    if q =~ /^(!)?@(.*)/
      key = $2.strip
      key = Socket.gethostname if key.size == 0

      add_queues = Qmore.configuration.dynamic_queues[key]
      add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1

      queue_patterns.concat(add_queues)
      next
    end

    if q =~ /^!/
      negated = true
      q = q[1..-1]
    end

    patstr = q.gsub(/\*/, ".*")
    pattern = /^#{patstr}$/
    if negated
      matched_queues -= matched_queues.grep(pattern)
    else
      matches = real_queues.grep(/^#{pattern}$/)
      matches = [q] if matches.size == 0 && q == patstr
      matched_queues.concat(matches)
    end
  end

  return matched_queues.uniq.sort
end

#prioritize_queues(priority_buckets, real_queues) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/qmore/attributes.rb', line 59

def prioritize_queues(priority_buckets, real_queues)
  real_queues = real_queues.dup
  priority_buckets = priority_buckets.dup

  result = []
  default_idx = -1, default_fairly = false;

  # Walk the priority patterns, extract each into its own bucket
  priority_buckets.each do |bucket|
    bucket_pattern = bucket['pattern']
    fairly = bucket['fairly']

    # note the position of the default bucket for inserting the remaining queues at that location
    if bucket_pattern == 'default'
      default_idx = result.size
      default_fairly = fairly
      next
    end

    bucket_queues, remaining = [], []

    patterns = bucket_pattern.split(',')
    patterns.each do |pattern|
      pattern = pattern.strip

      if pattern =~ /^!/
        negated = true
        pattern = pattern[1..-1]
      end

      patstr = pattern.gsub(/\*/, ".*")
      pattern = /^#{patstr}$/


      if negated
        bucket_queues -= bucket_queues.grep(pattern)
      else
        bucket_queues.concat(real_queues.grep(pattern))
      end

    end

    bucket_queues.uniq!
    bucket_queues.shuffle! if fairly
    real_queues = real_queues - bucket_queues

    result << bucket_queues

  end

  # insert the remaining queues at the position the default item was at (or last)
  real_queues.shuffle! if default_fairly
  result.insert(default_idx, real_queues)
  result.flatten!

  return result
end