Class: Sidekiq::Manager
- Inherits:
-
Object
- Object
- Sidekiq::Manager
- Defined in:
- lib/sidekiq/manager.rb
Overview
The main router in the system. This manages the processor state and accepts messages from Redis to be dispatched to an idle processor.
Constant Summary collapse
- SPIN_TIME_FOR_GRACEFUL_SHUTDOWN =
1- JVM_RESERVED_SIGNALS =
Don’t Process#kill if we get these signals via the API
['USR1', 'USR2']
Constants included from Util
Instance Attribute Summary collapse
-
#busy ⇒ Object
readonly
Returns the value of attribute busy.
-
#fetcher ⇒ Object
Returns the value of attribute fetcher.
-
#ready ⇒ Object
readonly
Returns the value of attribute ready.
Instance Method Summary collapse
- #assign(work) ⇒ Object
- #clean_up_for_graceful_shutdown ⇒ Object
- #heartbeat(key, data, json) ⇒ Object
-
#initialize(condvar, options = {}) ⇒ Manager
constructor
A new instance of Manager.
- #processor_died(processor, reason) ⇒ Object
- #processor_done(processor) ⇒ Object
-
#real_thread(proxy_id, thr) ⇒ Object
A hack worthy of Rube Goldberg.
- #start ⇒ Object
- #stop(options = {}) ⇒ Object
- #when_done(&blk) ⇒ Object
Methods included from Actor
Methods included from Util
#fire_event, #hostname, #identity, #logger, #process_nonce, #redis, #watchdog
Methods included from ExceptionHandler
Constructor Details
#initialize(condvar, options = {}) ⇒ Manager
Returns a new instance of Manager.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/sidekiq/manager.rb', line 26 def initialize(condvar, ={}) logger.debug { .inspect } @options = @count = [:concurrency] || 25 @done_callback = nil @finished = condvar @in_progress = {} @threads = {} @done = false @busy = [] @ready = @count.times.map do p = Processor.new_link(current_actor) p.proxy_id = p.object_id p end end |
Instance Attribute Details
#busy ⇒ Object (readonly)
Returns the value of attribute busy.
20 21 22 |
# File 'lib/sidekiq/manager.rb', line 20 def busy @busy end |
#fetcher ⇒ Object
Returns the value of attribute fetcher.
21 22 23 |
# File 'lib/sidekiq/manager.rb', line 21 def fetcher @fetcher end |
#ready ⇒ Object (readonly)
Returns the value of attribute ready.
19 20 21 |
# File 'lib/sidekiq/manager.rb', line 19 def ready @ready end |
Instance Method Details
#assign(work) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/sidekiq/manager.rb', line 112 def assign(work) watchdog("Manager#assign died") do if stopped? # Race condition between Manager#stop if Fetcher # is blocked on redis and gets a message after # all the ready Processors have been stopped. # Push the message back to redis. work.requeue else processor = @ready.pop @in_progress[processor.object_id] = work @busy << processor processor.async.process(work) end end end |
#clean_up_for_graceful_shutdown ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/sidekiq/manager.rb', line 61 def clean_up_for_graceful_shutdown if @busy.empty? shutdown return true end after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown } false end |
#heartbeat(key, data, json) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/sidekiq/manager.rb', line 137 def heartbeat(key, data, json) proctitle = ['sidekiq', Sidekiq::VERSION] proctitle << data['tag'] unless data['tag'].empty? proctitle << "[#{@busy.size} of #{data['concurrency']} busy]" proctitle << 'stopping' if stopped? $0 = proctitle.join(' ') ❤(key, json) after(5) do heartbeat(key, data, json) end end |
#processor_died(processor, reason) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/sidekiq/manager.rb', line 95 def processor_died(processor, reason) watchdog("Manager#processor_died died") do @in_progress.delete(processor.object_id) @threads.delete(processor.object_id) @busy.delete(processor) unless stopped? p = Processor.new_link(current_actor) p.proxy_id = p.object_id @ready << p dispatch else shutdown if @busy.empty? end end end |
#processor_done(processor) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/sidekiq/manager.rb', line 79 def processor_done(processor) watchdog('Manager#processor_done died') do @done_callback.call(processor) if @done_callback @in_progress.delete(processor.object_id) @threads.delete(processor.object_id) @busy.delete(processor) if stopped? processor.terminate if processor.alive? shutdown if @busy.empty? else @ready << processor if processor.alive? end dispatch end end |
#real_thread(proxy_id, thr) ⇒ Object
A hack worthy of Rube Goldberg. We need to be able to hard stop a working thread. But there’s no way for us to get handle to the underlying thread performing work for a processor so we have it call us and tell us.
133 134 135 |
# File 'lib/sidekiq/manager.rb', line 133 def real_thread(proxy_id, thr) @threads[proxy_id] = thr end |
#start ⇒ Object
71 72 73 |
# File 'lib/sidekiq/manager.rb', line 71 def start @ready.each { dispatch } end |
#stop(options = {}) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/sidekiq/manager.rb', line 44 def stop(={}) watchdog('Manager#stop died') do should_shutdown = [:shutdown] timeout = [:timeout] @done = true logger.info { "Terminating #{@ready.size} quiet workers" } @ready.each { |x| x.terminate if x.alive? } @ready.clear return if clean_up_for_graceful_shutdown hard_shutdown_in timeout if should_shutdown end end |
#when_done(&blk) ⇒ Object
75 76 77 |
# File 'lib/sidekiq/manager.rb', line 75 def when_done(&blk) @done_callback = blk end |