Class: RocketJob::Supervisor

Inherits:
Object
  • Object
show all
Includes:
Shutdown, SemanticLogger::Loggable
Defined in:
lib/rocket_job/supervisor.rb,
lib/rocket_job/supervisor/shutdown.rb

Overview

Starts a server instance, along with the workers and ensures workers remain running until they need to shutdown.

Defined Under Namespace

Modules: Shutdown

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server) ⇒ Supervisor

Returns a new instance of Supervisor.



24
25
26
27
28
# File 'lib/rocket_job/supervisor.rb', line 24

def initialize(server)
  @server      = server
  @worker_pool = WorkerPool.new(server.name)
  @mutex       = Mutex.new
end

Instance Attribute Details

#serverObject (readonly)

Returns the value of attribute server.



9
10
11
# File 'lib/rocket_job/supervisor.rb', line 9

def server
  @server
end

#worker_idObject

Returns the value of attribute worker_id.



10
11
12
# File 'lib/rocket_job/supervisor.rb', line 10

def worker_id
  @worker_id
end

#worker_poolObject (readonly)

Returns the value of attribute worker_pool.



9
10
11
# File 'lib/rocket_job/supervisor.rb', line 9

def worker_pool
  @worker_pool
end

Class Method Details

.runObject

Start the Supervisor, using the supplied attributes to create a new Server instance.



13
14
15
16
17
18
19
20
21
22
# File 'lib/rocket_job/supervisor.rb', line 13

def self.run
  Thread.current.name = "rocketjob main"
  RocketJob.create_indexes
  register_signal_handlers

  server = Server.create!
  new(server).run
ensure
  server&.destroy
end

Instance Method Details

#runObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rocket_job/supervisor.rb', line 30

def run
  logger.info "Using MongoDB Database: #{RocketJob::Job.collection.database.name}"
  logger.info("Running with filter", Config.filter) if Config.filter
  server.started!
  logger.info "Rocket Job Server started"

  event_listener = Thread.new { Event.listener }
  Subscribers::SecretConfig.subscribe if defined?(SecretConfig)
  Subscribers::Server.subscribe(self) do
    Subscribers::Worker.subscribe(self) do
      Subscribers::Logger.subscribe do
        supervise_pool
        stop!
      end
    end
  end
rescue ::Mongoid::Errors::DocumentNotFound
  logger.info("Server has been destroyed. Going down hard!")
rescue Exception => e
  logger.error("RocketJob::Server is stopping due to an exception", e)
ensure
  event_listener&.kill
  # Logs the backtrace for each running worker
  worker_pool.log_backtraces
  logger.info("Shutdown Complete")
end

#stop!Object



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/rocket_job/supervisor.rb', line 57

def stop!
  server.stop! if server.may_stop?
  synchronize do
    worker_pool.stop
  end
  until worker_pool.join
    logger.info "Waiting for workers to finish processing ..."
    # One or more workers still running so update heartbeat so that server reports "alive".
    server.refresh(worker_pool.living_count)
  end
end

#supervise_poolObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/rocket_job/supervisor.rb', line 69

def supervise_pool
  stagger = true
  until self.class.shutdown?
    synchronize do
      if server.running?
        worker_pool.prune
        worker_pool.rebalance(server.max_workers, stagger)
        stagger = false
      elsif server.paused?
        worker_pool.stop
        sleep(0.1)
        worker_pool.prune
        stagger = true
      else
        break
      end
    end

    synchronize { server.refresh(worker_pool.living_count) }

    self.class.wait_for_event(Config.heartbeat_seconds)

    break if self.class.shutdown?
  end
end

#synchronize(&block) ⇒ Object



95
96
97
# File 'lib/rocket_job/supervisor.rb', line 95

def synchronize(&block)
  @mutex.synchronize(&block)
end