Class: Pallets::Manager
- Inherits:
-
Object
- Object
- Pallets::Manager
- Defined in:
- lib/pallets/manager.rb
Instance Attribute Summary collapse
-
#scheduler ⇒ Object
readonly
Returns the value of attribute scheduler.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize(concurrency: Pallets.configuration.concurrency) ⇒ Manager
constructor
A new instance of Manager.
- #remove_worker(worker) ⇒ Object
- #replace_worker(worker) ⇒ Object
-
#shutdown ⇒ Object
Attempt to gracefully shutdown every worker.
- #start ⇒ Object
Constructor Details
#initialize(concurrency: Pallets.configuration.concurrency) ⇒ Manager
Returns a new instance of Manager.
5 6 7 8 9 10 |
# File 'lib/pallets/manager.rb', line 5 def initialize(concurrency: Pallets.configuration.concurrency) @workers = concurrency.times.map { Worker.new(self) } @scheduler = Scheduler.new(self) @lock = Mutex.new @needs_to_stop = false end |
Instance Attribute Details
#scheduler ⇒ Object (readonly)
Returns the value of attribute scheduler.
3 4 5 |
# File 'lib/pallets/manager.rb', line 3 def scheduler @scheduler end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
3 4 5 |
# File 'lib/pallets/manager.rb', line 3 def workers @workers end |
Instance Method Details
#remove_worker(worker) ⇒ Object
41 42 43 |
# File 'lib/pallets/manager.rb', line 41 def remove_worker(worker) @lock.synchronize { @workers.delete(worker) } end |
#replace_worker(worker) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pallets/manager.rb', line 45 def replace_worker(worker) @lock.synchronize do @workers.delete(worker) return if @needs_to_stop worker = Worker.new(self) @workers << worker worker.start end end |
#shutdown ⇒ Object
Attempt to gracefully shutdown every worker. If any is still busy after the given timeout, hard shutdown it. We don’t need to worry about lost jobs caused by the hard shutdown; there is a reliability list that contains all active jobs, which will be automatically requeued upon next start
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/pallets/manager.rb', line 22 def shutdown @needs_to_stop = true @workers.reverse_each(&:graceful_shutdown) @scheduler.shutdown Pallets.logger.info 'Waiting for workers to finish their jobs...' # Wait for 10 seconds at most 10.times do return if @workers.empty? sleep 1 end @workers.reverse_each(&:hard_shutdown) # Ensure Pallets::Shutdown got propagated and workers finished; if not, # their threads will be killed anyway when the manager quits sleep 0.5 end |
#start ⇒ Object
12 13 14 15 |
# File 'lib/pallets/manager.rb', line 12 def start @workers.each(&:start) @scheduler.start end |