Module: Sidekiq::DynamicQueues::Attributes
- Extended by:
- Attributes
- Included in:
- Attributes, Fetch
- Defined in:
- lib/sidekiq/dynamic_queues/attributes.rb
Instance Method Summary collapse
-
#expand_queues(queues) ⇒ Object
Returns a list of queues to use when searching for a job.
- #get_dynamic_queue(key, fallback = ['*']) ⇒ Object
- #get_dynamic_queues ⇒ Object
- #json_decode(data) ⇒ Object
- #json_encode(data) ⇒ Object
- #set_dynamic_queue(key, values) ⇒ Object
- #set_dynamic_queues(dynamic_queues) ⇒ Object
Instance Method Details
#expand_queues(queues) ⇒ Object
Returns a list of queues to use when searching for a job.
A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
The splat can also be used as a wildcard within a queue name, e.g. “high”, and negation can be indicated with a prefix of “!”
An @key can be used to dynamically look up the queue list for key from redis. If no key is supplied, it defaults to the worker’s hostname, and wildcards and negations can be used inside this dynamic queue list. Set the queue list for a key with Sidekiq::DynamicQueues::Attributes.set_dynamic_queue(key, [“q1”, “q2”]
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/sidekiq/dynamic_queues/attributes.rb', line 76 def (queues) queue_names = queues.dup real_queues = Sidekiq::Queue.all.map(&:name) matched_queues = [] while q = queue_names.shift q = q.to_s if q =~ /^(!)?@(.*)/ key = $2.strip key = hostname if key.size == 0 add_queues = get_dynamic_queue(key) add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1 queue_names.concat(add_queues) next end if q =~ /^!/ negated = true q = q[1..-1] end patstr = q.gsub(/\*/, ".*") pattern = /^#{patstr}$/ if negated matched_queues -= matched_queues.grep(pattern) else matches = real_queues.grep(/^#{pattern}$/) matches = [q] if matches.size == 0 && q == patstr matched_queues.concat(matches) end end return matched_queues.collect { |q| "queue:#{q}" }.uniq.sort end |
#get_dynamic_queue(key, fallback = ['*']) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/sidekiq/dynamic_queues/attributes.rb', line 19 def get_dynamic_queue(key, fallback=['*']) data = Sidekiq.redis {|r| r.hget(DYNAMIC_QUEUE_KEY, key) } queue_names = json_decode(data) if queue_names.nil? || queue_names.size == 0 data = Sidekiq.redis {|r| r.hget(DYNAMIC_QUEUE_KEY, FALLBACK_KEY) } queue_names = json_decode(data) end if queue_names.nil? || queue_names.size == 0 queue_names = fallback end return queue_names end |
#get_dynamic_queues ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/sidekiq/dynamic_queues/attributes.rb', line 54 def get_dynamic_queues result = {} queues = Sidekiq.redis {|r| r.hgetall(DYNAMIC_QUEUE_KEY) } queues.each {|k, v| result[k] = json_decode(v) } result[FALLBACK_KEY] ||= ['*'] return result end |
#json_decode(data) ⇒ Object
14 15 16 17 |
# File 'lib/sidekiq/dynamic_queues/attributes.rb', line 14 def json_decode(data) return nil unless data Sidekiq.load_json(data) end |
#json_encode(data) ⇒ Object
10 11 12 |
# File 'lib/sidekiq/dynamic_queues/attributes.rb', line 10 def json_encode(data) Sidekiq.dump_json(data) end |
#set_dynamic_queue(key, values) ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/sidekiq/dynamic_queues/attributes.rb', line 35 def set_dynamic_queue(key, values) if values.nil? or values.size == 0 Sidekiq.redis {|r| r.hdel(DYNAMIC_QUEUE_KEY, key) } else Sidekiq.redis {|r| r.hset(DYNAMIC_QUEUE_KEY, key, json_encode(values)) } end end |
#set_dynamic_queues(dynamic_queues) ⇒ Object
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/sidekiq/dynamic_queues/attributes.rb', line 43 def set_dynamic_queues(dynamic_queues) Sidekiq.redis do |r| r.multi do r.del(DYNAMIC_QUEUE_KEY) dynamic_queues.each do |k, v| set_dynamic_queue(k, v) end end end end |