Class: RocketJob::Supervisor
- Inherits:
-
Object
- Object
- RocketJob::Supervisor
- Includes:
- Shutdown, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/supervisor.rb,
lib/rocket_job/supervisor/shutdown.rb
Overview
Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown.
Defined Under Namespace
Modules: Shutdown
Instance Attribute Summary collapse
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#worker_id ⇒ Object
Returns the value of attribute worker_id.
-
#worker_pool ⇒ Object
readonly
Returns the value of attribute worker_pool.
Class Method Summary collapse
-
.run(attrs = {}) ⇒ Object
Start the Supervisor, using the supplied attributes to create a new Server instance.
Instance Method Summary collapse
-
#initialize(server) ⇒ Supervisor
constructor
A new instance of Supervisor.
- #run ⇒ Object
- #stop! ⇒ Object
- #supervise_pool ⇒ Object
- #synchronize(&block) ⇒ Object
Constructor Details
#initialize(server) ⇒ Supervisor
Returns a new instance of Supervisor.
24 25 26 27 28 |
# File 'lib/rocket_job/supervisor.rb', line 24 def initialize(server) @server = server @worker_pool = WorkerPool.new(server.name, server.filter) @mutex = Mutex.new end |
Instance Attribute Details
#server ⇒ Object (readonly)
Returns the value of attribute server.
9 10 11 |
# File 'lib/rocket_job/supervisor.rb', line 9 def server @server end |
#worker_id ⇒ Object
Returns the value of attribute worker_id.
10 11 12 |
# File 'lib/rocket_job/supervisor.rb', line 10 def worker_id @worker_id end |
#worker_pool ⇒ Object (readonly)
Returns the value of attribute worker_pool.
9 10 11 |
# File 'lib/rocket_job/supervisor.rb', line 9 def worker_pool @worker_pool end |
Class Method Details
.run(attrs = {}) ⇒ Object
Start the Supervisor, using the supplied attributes to create a new Server instance.
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/rocket_job/supervisor.rb', line 13 def self.run(attrs = {}) Thread.current.name = 'rocketjob main' RocketJob.create_indexes register_signal_handlers server = Server.create!(attrs) new(server).run ensure server&.destroy end |
Instance Method Details
#run ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/rocket_job/supervisor.rb', line 30 def run logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}" logger.info('Running with filter', server.filter) if server.filter server.started! logger.info 'Rocket Job Server started' event_listener = Thread.new { Event.listener } Subscribers::Server.subscribe(self) do Subscribers::Worker.subscribe(self) do Subscribers::Logger.subscribe do supervise_pool stop! end end end rescue ::Mongoid::Errors::DocumentNotFound logger.info('Server has been destroyed. Going down hard!') rescue Exception => exc logger.error('RocketJob::Server is stopping due to an exception', exc) ensure event_listener.kill if event_listener # Logs the backtrace for each running worker worker_pool.log_backtraces logger.info('Shutdown Complete') end |
#stop! ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/rocket_job/supervisor.rb', line 56 def stop! server.stop! if server.may_stop? worker_pool.stop while !worker_pool.join logger.info 'Waiting for workers to finish processing ...' # One or more workers still running so update heartbeat so that server reports "alive". server.refresh(worker_pool.living_count) end end |
#supervise_pool ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rocket_job/supervisor.rb', line 66 def supervise_pool stagger = true while !self.class.shutdown? synchronize do if server.running? worker_pool.prune worker_pool.rebalance(server.max_workers, stagger) stagger = false elsif server.paused? worker_pool.stop sleep(0.1) worker_pool.prune stagger = true else break end end self.class.wait_for_event(Config.instance.heartbeat_seconds) break if self.class.shutdown? synchronize { server.refresh(worker_pool.living_count) } end end |
#synchronize(&block) ⇒ Object
92 93 94 |
# File 'lib/rocket_job/supervisor.rb', line 92 def synchronize(&block) @mutex.synchronize(&block) end |