Module: Qmore::Attributes
Instance Method Summary collapse
- #decode(data) ⇒ Object
- #encode(data) ⇒ Object
-
#expand_queues(queue_patterns, real_queues) ⇒ Object
Returns a list of queues to use when searching for a job.
- #get_dynamic_queue(key, fallback = ['*']) ⇒ Object
- #get_dynamic_queues ⇒ Object
- #get_priority_buckets ⇒ Object
- #prioritize_queues(priority_buckets, real_queues) ⇒ Object
- #redis ⇒ Object
- #set_dynamic_queue(key, values) ⇒ Object
- #set_dynamic_queues(dynamic_queues) ⇒ Object
- #set_priority_buckets(data) ⇒ Object
Instance Method Details
#decode(data) ⇒ Object
15 16 17 |
# File 'lib/qmore/attributes.rb', line 15 def decode(data) MultiJson.load(data) if data end |
#encode(data) ⇒ Object
19 20 21 |
# File 'lib/qmore/attributes.rb', line 19 def encode(data) MultiJson.dump(data) end |
#expand_queues(queue_patterns, real_queues) ⇒ Object
Returns a list of queues to use when searching for a job.
A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
The splat can also be used as a wildcard within a queue name, e.g. “high”, and negation can be indicated with a prefix of “!”
An @key can be used to dynamically look up the queue list for key from redis. If no key is supplied, it defaults to the worker’s hostname, and wildcards and negations can be used inside this dynamic queue list. Set the queue list for a key with set_dynamic_queue(key, [“q1”, “q2”]
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/qmore/attributes.rb', line 93 def (queue_patterns, real_queues) queue_patterns = queue_patterns.dup real_queues = real_queues.dup matched_queues = [] while q = queue_patterns.shift q = q.to_s if q =~ /^(!)?@(.*)/ key = $2.strip key = Socket.gethostname if key.size == 0 add_queues = get_dynamic_queue(key) add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1 queue_patterns.concat(add_queues) next end if q =~ /^!/ negated = true q = q[1..-1] end patstr = q.gsub(/\*/, ".*") pattern = /^#{patstr}$/ if negated matched_queues -= matched_queues.grep(pattern) else matches = real_queues.grep(/^#{pattern}$/) matches = [q] if matches.size == 0 && q == patstr matched_queues.concat(matches) end end return matched_queues.uniq.sort end |
#get_dynamic_queue(key, fallback = ['*']) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/qmore/attributes.rb', line 23 def get_dynamic_queue(key, fallback=['*']) data = redis.hget(DYNAMIC_QUEUE_KEY, key) queue_names = decode(data) if queue_names.nil? || queue_names.size == 0 data = redis.hget(DYNAMIC_QUEUE_KEY, DYNAMIC_FALLBACK_KEY) queue_names = decode(data) end if queue_names.nil? || queue_names.size == 0 queue_names = fallback end return queue_names end |
#get_dynamic_queues ⇒ Object
56 57 58 59 60 61 62 |
# File 'lib/qmore/attributes.rb', line 56 def get_dynamic_queues result = {} queues = redis.hgetall(DYNAMIC_QUEUE_KEY) queues.each {|k, v| result[k] = decode(v) } result[DYNAMIC_FALLBACK_KEY] ||= ['*'] return result end |
#get_priority_buckets ⇒ Object
64 65 66 67 68 69 |
# File 'lib/qmore/attributes.rb', line 64 def get_priority_buckets priorities = Array(redis.lrange(PRIORITY_KEY, 0, -1)) priorities = priorities.collect {|p| decode(p) } priorities << {'pattern' => 'default'} unless priorities.find {|b| b['pattern'] == 'default' } return priorities end |
#prioritize_queues(priority_buckets, real_queues) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/qmore/attributes.rb', line 132 def prioritize_queues(priority_buckets, real_queues) real_queues = real_queues.dup priority_buckets = priority_buckets.dup result = [] default_idx = -1, default_fairly = false; # Walk the priority patterns, extract each into its own bucket priority_buckets.each do |bucket| bucket_pattern = bucket['pattern'] fairly = bucket['fairly'] # note the position of the default bucket for inserting the remaining queues at that location if bucket_pattern == 'default' default_idx = result.size default_fairly = fairly next end bucket_queues, remaining = [], [] patterns = bucket_pattern.split(',') patterns.each do |pattern| pattern = pattern.strip if pattern =~ /^!/ negated = true pattern = pattern[1..-1] end patstr = pattern.gsub(/\*/, ".*") pattern = /^#{patstr}$/ if negated bucket_queues -= bucket_queues.grep(pattern) else bucket_queues.concat(real_queues.grep(pattern)) end end bucket_queues.uniq! bucket_queues.shuffle! if fairly real_queues = real_queues - bucket_queues result << bucket_queues end # insert the remaining queues at the position the default item was at (or last) real_queues.shuffle! if default_fairly result.insert(default_idx, real_queues) result.flatten! return result end |
#set_dynamic_queue(key, values) ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/qmore/attributes.rb', line 39 def set_dynamic_queue(key, values) if values.nil? or values.size == 0 redis.hdel(DYNAMIC_QUEUE_KEY, key) else redis.hset(DYNAMIC_QUEUE_KEY, key, encode(values)) end end |
#set_dynamic_queues(dynamic_queues) ⇒ Object
47 48 49 50 51 52 53 54 |
# File 'lib/qmore/attributes.rb', line 47 def set_dynamic_queues(dynamic_queues) redis.multi do redis.del(DYNAMIC_QUEUE_KEY) dynamic_queues.each do |k, v| set_dynamic_queue(k, v) end end end |
#set_priority_buckets(data) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/qmore/attributes.rb', line 71 def set_priority_buckets(data) redis.multi do redis.del(PRIORITY_KEY) Array(data).each do |v| redis.rpush(PRIORITY_KEY, encode(v)) end end end |