Class: Resque::Plugins::Fifo::Queue::Manager
- Inherits:
-
Object
- Object
- Resque::Plugins::Fifo::Queue::Manager
- Defined in:
- lib/resque/plugins/fifo/queue/manager.rb
Constant Summary collapse
- DLM_TTL =
30000
Instance Attribute Summary collapse
-
#queue_prefix ⇒ Object
Returns the value of attribute queue_prefix.
Class Method Summary collapse
Instance Method Summary collapse
- #all_stats ⇒ Object
- #clear_stats ⇒ Object
- #compute_queue_name(key) ⇒ Object
- #dht_times_rehashed ⇒ Object
- #dump_dht ⇒ Object
- #dump_queue_names ⇒ Object
- #dump_queues ⇒ Object
- #dump_queues_sorted ⇒ Object
- #dump_queues_with_slices ⇒ Object
- #enqueue(key, klass, *args) ⇒ Object
- #fifo_hash_table_name ⇒ Object
- #get_processed_count(queue) ⇒ Object
- #get_stats_avg_delay ⇒ Object
- #get_stats_avg_dht_recalc ⇒ Object
- #get_stats_max_delay ⇒ Object
-
#initialize(queue_prefix = 'fifo') ⇒ Manager
constructor
A new instance of Manager.
-
#inline? ⇒ Boolean
method for stubbing in tests.
- #orphaned_queues ⇒ Object
- #peek_pending ⇒ Object
- #pending_queue_name ⇒ Object
- #pending_total ⇒ Object
- #pretty_dump ⇒ Object
- #pretty_dump_queues ⇒ Object
- #request_refresh ⇒ Object
- #update_workers ⇒ Object
- #worker_for_queue(queue_name) ⇒ Object
Constructor Details
#initialize(queue_prefix = 'fifo') ⇒ Manager
Returns a new instance of Manager.
11 12 13 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 11 def initialize(queue_prefix = 'fifo') @queue_prefix = queue_prefix end |
Instance Attribute Details
#queue_prefix ⇒ Object
Returns the value of attribute queue_prefix.
9 10 11 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 9 def queue_prefix @queue_prefix end |
Class Method Details
.enqueue_to(key, klass, *args) ⇒ Object
112 113 114 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 112 def self.enqueue_to(key, klass, *args) enqueue_topic('fifo', key, klass, *args) end |
.enqueue_topic(topic, key, klass, *args) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 116 def self.enqueue_topic(topic, key, klass, *args) # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook| klass.send(hook, *args) end return nil if before_hooks.any? { |result| result == false } manager = Resque::Plugins::Fifo::Queue::Manager.new(topic) manager.enqueue(key, klass, *args) Plugin.after_enqueue_hooks(klass).each do |hook| klass.send(hook, *args) end return true end |
Instance Method Details
#all_stats ⇒ Object
103 104 105 106 107 108 109 110 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 103 def all_stats { dht_times_rehashed: dht_times_rehashed, avg_delay: get_stats_avg_delay, avg_dht_recalc: get_stats_avg_dht_recalc, max_delay: get_stats_max_delay } end |
#clear_stats ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 64 def clear_stats redis_client.del "fifo-stats-max-delay" redis_client.del "fifo-stats-accumulated-delay" redis_client.del "fifo-stats-accumulated-count" redis_client.del "fifo-stats-dht-rehash" redis_client.del "fifo-stats-accumulated-recalc-time" redis_client.del "fifo-stats-accumulated-recalc-count" slots = redis_client.lrange fifo_hash_table_name, 0, -1 slots.each_with_index.collect do |slot, index| slice, queue = slot.split('#') redis_client.del "queue-stats-#{queue}" end end |
#compute_queue_name(key) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 27 def compute_queue_name(key) index = compute_index(key) slots = redis_client.lrange fifo_hash_table_name, 0, -1 return pending_queue_name if slots.empty? slots.reverse.each do |slot| slice, queue = slot.split('#') if index > slice.to_i return queue end end _slice, queue_name = slots.last.split('#') queue_name end |
#dht_times_rehashed ⇒ Object
99 100 101 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 99 def dht_times_rehashed redis_client.get("fifo-stats-dht-rehash") || 0 end |
#dump_dht ⇒ Object
134 135 136 137 138 139 140 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 134 def dump_dht slots = redis_client.lrange fifo_hash_table_name, 0, -1 slots.each_with_index.collect do |slot, index| slice, queue = slot.split('#') [slice.to_i, queue] end end |
#dump_queue_names ⇒ Object
158 159 160 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 158 def dump_queue_names dump_dht.collect { |item| item[1] } end |
#dump_queues ⇒ Object
170 171 172 173 174 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 170 def dump_queues query_available_queues.collect do |queue| [queue, Resque.peek(queue,0,0)] end.to_h end |
#dump_queues_sorted ⇒ Object
215 216 217 218 219 220 221 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 215 def dump_queues_sorted queues = dump_queues dht = dump_dht.collect do |item| _slice, queue_name = item queues[queue_name] end end |
#dump_queues_with_slices ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 187 def dump_queues_with_slices slots = redis_client.lrange fifo_hash_table_name, 0, -1 slots.collect do |slot, index| slice, queue = slot.split('#') worker = worker_for_queue(queue) hostname = '?' status = '?' pid = '?' started = '?' heartbeat = '?' if worker hostname = worker.hostname status = worker.paused? ? 'paused' : worker.state.to_s pid = worker.pid started = worker.started heartbeat = worker.heartbeat end [slice, queue, hostname, pid, status, started, heartbeat, get_processed_count(queue), Resque.peek(queue,0,0).size ] end end |
#enqueue(key, klass, *args) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 45 def enqueue(key, klass, *args) queue = compute_queue_name(key) redis_client.incr "queue-stats-#{queue}" Resque.validate(klass, queue) if Resque.inline? && inline? # Instantiating a Resque::Job and calling perform on it so callbacks run # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job Resque::Job.new(:inline, {'class' => klass, 'args' => Resque.decode(Resque.encode(args)), 'fifo_key' => key, 'enqueue_ts' => 0}).perform else Resque.push(queue, :class => klass.to_s, :args => args, fifo_key: key, :enqueue_ts => Time.now.to_i) end end |
#fifo_hash_table_name ⇒ Object
15 16 17 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 15 def fifo_hash_table_name "fifo-queue-lookup-#{@queue_prefix}" end |
#get_processed_count(queue) ⇒ Object
211 212 213 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 211 def get_processed_count(queue) redis_client.get("queue-stats-#{queue}") || 0 end |
#get_stats_avg_delay ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 91 def get_stats_avg_delay accumulated_delay = redis_client.get("fifo-stats-accumulated-delay") || 0 total_items = redis_client.get("fifo-stats-accumulated-count") || 0 return 0 if total_items == 0 return accumulated_delay.to_f / total_items.to_f end |
#get_stats_avg_dht_recalc ⇒ Object
83 84 85 86 87 88 89 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 83 def get_stats_avg_dht_recalc accumulated_delay = redis_client.get("fifo-stats-accumulated-recalc-time") || 0 total_items = redis_client.get("fifo-stats-accumulated-recalc-count") || 0 return 0 if total_items == 0 return accumulated_delay.to_f / total_items.to_f end |
#get_stats_max_delay ⇒ Object
79 80 81 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 79 def get_stats_max_delay redis_client.get("fifo-stats-max-delay") || 0 end |
#inline? ⇒ Boolean
method for stubbing in tests
60 61 62 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 60 def inline? Resque.inline? end |
#orphaned_queues ⇒ Object
264 265 266 267 268 269 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 264 def orphaned_queues current_queues = dump_queue_names Resque.all_queues.reject do |queue| !queue.start_with?(queue_prefix) || current_queues.include?(queue) end end |
#peek_pending ⇒ Object
150 151 152 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 150 def peek_pending Resque.peek(pending_queue_name, 0, 0) end |
#pending_queue_name ⇒ Object
23 24 25 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 23 def pending_queue_name "#{queue_prefix}-pending" end |
#pending_total ⇒ Object
154 155 156 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 154 def pending_total redis_client.llen "queue:#{pending_queue_name}" end |
#pretty_dump ⇒ Object
142 143 144 145 146 147 148 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 142 def pretty_dump slots = redis_client.lrange fifo_hash_table_name, 0, -1 slots.each_with_index.collect do |slot, index| slice, queue = slot.split('#') puts "Slice ##{slice} -> #{queue}" end end |
#pretty_dump_queues ⇒ Object
176 177 178 179 180 181 182 183 184 185 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 176 def pretty_dump_queues slots = redis_client.lrange fifo_hash_table_name, 0, -1 slots.each_with_index.collect do |slot, index| slice, queue = slot.split('#') puts "#Slice #{slice}" puts "#{Resque.peek(queue,0,0).to_s.gsub('},',"},\n")}," puts "\n" end end |
#request_refresh ⇒ Object
252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 252 def request_refresh if Resque.inline? # Instantiating a Resque::Job and calling perform on it so callbacks run # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job Resque::Job.new(:inline, {'class' => Resque::Plugins::Fifo::Queue::DrainWorker, 'args' => []}).perform else redis_client.set "fifo_update_timestamp-#{queue_prefix}", Time.now.to_s Resque.push(:fifo_refresh, :class => Resque::Plugins::Fifo::Queue::DrainWorker.to_s, :args => []) end end |
#update_workers ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 223 def update_workers # query removed workers start_time = Time.now.to_i redlock.lock("fifo_queue_lock-#{queue_prefix}", DLM_TTL) do |locked| if locked = redis_client.get "fifo_update_timestamp-#{queue_prefix}" process_dht cleanup_queues log("reinserting items from pending") reinsert_pending_items(pending_queue_name) # check if something tried to request an update, if so we requie again = redis_client.get "fifo_update_timestamp-#{queue_prefix}" if != request_refresh end else log("unable to lock DHT.") end end end_time = Time.now.to_i redis_client.set("fifo-stats-accumulated-recalc-time", end_time - start_time) redis_client.incr "fifo-stats-accumulated-recalc-count" end |
#worker_for_queue(queue_name) ⇒ Object
162 163 164 165 166 167 168 |
# File 'lib/resque/plugins/fifo/queue/manager.rb', line 162 def worker_for_queue(queue_name) Resque.workers.collect do |worker| w_queue_name = worker.queues.select { |name| name.start_with?("#{queue_prefix}-") }.first return worker if w_queue_name == queue_name end.compact nil end |