Class: RocketJob::Supervisor

Inherits:
Object
  • Object
show all
Includes:
Shutdown, SemanticLogger::Loggable
Defined in:
lib/rocket_job/supervisor.rb,
lib/rocket_job/supervisor/shutdown.rb

Overview

Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown.

Defined Under Namespace

Modules: Shutdown

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server) ⇒ Supervisor

Returns a new instance of Supervisor.



24
25
26
27
28
# File 'lib/rocket_job/supervisor.rb', line 24

def initialize(server)
  @server      = server
  @worker_pool = WorkerPool.new(server.name, server.filter)
  @mutex       = Mutex.new
end

Instance Attribute Details

#serverObject (readonly)

Returns the value of attribute server.



9
10
11
# File 'lib/rocket_job/supervisor.rb', line 9

def server
  @server
end

#worker_idObject

Returns the value of attribute worker_id.



10
11
12
# File 'lib/rocket_job/supervisor.rb', line 10

def worker_id
  @worker_id
end

#worker_poolObject (readonly)

Returns the value of attribute worker_pool.



9
10
11
# File 'lib/rocket_job/supervisor.rb', line 9

def worker_pool
  @worker_pool
end

Class Method Details

.run(attrs = {}) ⇒ Object

Start the Supervisor, using the supplied attributes to create a new Server instance.



13
14
15
16
17
18
19
20
21
22
# File 'lib/rocket_job/supervisor.rb', line 13

def self.run(attrs = {})
  Thread.current.name = 'rocketjob main'
  RocketJob.create_indexes
  register_signal_handlers

  server = Server.create!(attrs)
  new(server).run
ensure
  server&.destroy
end

Instance Method Details

#runObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rocket_job/supervisor.rb', line 30

def run
  logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
  logger.info('Running with filter', server.filter) if server.filter
  server.started!
  logger.info 'Rocket Job Server started'

  event_listener = Thread.new { Event.listener }
  Subscribers::Server.subscribe(self) do
    Subscribers::Worker.subscribe(self) do
      Subscribers::Logger.subscribe do
        supervise_pool
        stop!
      end
    end
  end
rescue ::Mongoid::Errors::DocumentNotFound
  logger.info('Server has been destroyed. Going down hard!')
rescue Exception => exc
  logger.error('RocketJob::Server is stopping due to an exception', exc)
ensure
  event_listener.kill if event_listener
  # Logs the backtrace for each running worker
  worker_pool.log_backtraces
  logger.info('Shutdown Complete')
end

#stop!Object



56
57
58
59
60
61
62
63
64
# File 'lib/rocket_job/supervisor.rb', line 56

def stop!
  server.stop! if server.may_stop?
  worker_pool.stop
  while !worker_pool.join
    logger.info 'Waiting for workers to finish processing ...'
    # One or more workers still running so update heartbeat so that server reports "alive".
    server.refresh(worker_pool.living_count)
  end
end

#supervise_poolObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/rocket_job/supervisor.rb', line 66

def supervise_pool
  stagger = true
  while !self.class.shutdown?
    synchronize do
      if server.running?
        worker_pool.prune
        worker_pool.rebalance(server.max_workers, stagger)
        stagger = false
      elsif server.paused?
        worker_pool.stop
        sleep(0.1)
        worker_pool.prune
        stagger = true
      else
        break
      end
    end

    self.class.wait_for_event(Config.instance.heartbeat_seconds)

    break if self.class.shutdown?

    synchronize { server.refresh(worker_pool.living_count) }
  end
end

#synchronize(&block) ⇒ Object



92
93
94
# File 'lib/rocket_job/supervisor.rb', line 92

def synchronize(&block)
  @mutex.synchronize(&block)
end