Class: RocketJob::Server

Inherits:
Object
  • Object
show all
Includes:
Plugins::Document, Plugins::StateMachine, SemanticLogger::Loggable
Defined in:
lib/rocket_job/server.rb

Overview

Server

On startup a server instance will automatically register itself if not already present

Starting a server in the foreground:

- Using a Rails runner:
  bin/rocketjob

Starting a server in the background:

- Using a Rails runner:
  nohup bin/rocketjob --quiet 2>&1 1>output.log &

Stopping a server:

- Stop the server via the Web UI
- Send a regular kill signal to make it shutdown once all active work is complete
    kill <pid>
- Or, use the following Ruby code:
  server = RocketJob::Server.where(name: 'server name').first
  server.stop!

Sending the kill signal locally will result in starting the shutdown process
immediately. Via the UI or Ruby code the server can take up to 15 seconds
(the heartbeat interval) to start shutting down.

Constant Summary collapse

@@shutdown =
false

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.counts_by_stateObject

Returns [Hash<String:Integer>] of the number of servers in each state. Note: If there are no servers in that particular state then the hash will not have a value for it.

Example servers in every state:

RocketJob::Server.counts_by_state
# => {
       :aborted => 1,
       :completed => 37,
       :failed => 1,
       :paused => 3,
       :queued => 4,
       :running => 1,
       :queued_now => 1,
       :scheduled => 3
     }

Example no servers active:

RocketJob::Server.counts_by_state
# => {}


142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/rocket_job/server.rb', line 142

def self.counts_by_state
  counts = {}
  collection.aggregate([
    {
      '$group' => {
        _id:   '$state',
        count: {'$sum' => 1}
      }
    }
  ]
  ).each do |result|
    counts[result['_id'].to_sym] = result['count']
  end
  counts
end

.destroy_zombiesObject

Destroy’s all instances of zombie server and requeue any jobs still “running” on those servers



97
98
99
100
101
102
103
104
105
106
# File 'lib/rocket_job/server.rb', line 97

def self.destroy_zombies
  count = 0
  each do |server|
    next unless server.zombie?
    logger.warn "Destroying zombie server #{server.name}, and requeueing its jobs"
    server.destroy
    count += 1
  end
  count
end

.pause_allObject

Pause all running servers



114
115
116
# File 'lib/rocket_job/server.rb', line 114

def self.pause_all
  running.each(&:pause!)
end

.resume_allObject

Resume all paused servers



119
120
121
# File 'lib/rocket_job/server.rb', line 119

def self.resume_all
  paused.each(&:resume!)
end

.run(attrs = {}) ⇒ Object

Run the server process Attributes supplied are passed to #new



187
188
189
190
191
192
193
194
195
196
197
# File 'lib/rocket_job/server.rb', line 187

def self.run(attrs = {})
  Thread.current.name = 'rocketjob main'
  # Create Indexes on server startup
  Mongoid::Tasks::Database.create_indexes
  register_signal_handlers

  server = create!(attrs)
  server.send(:run)
ensure
  server.destroy if server
end

.shutdown!Object

Set shutdown indicator for this server process



166
167
168
# File 'lib/rocket_job/server.rb', line 166

def self.shutdown!
  @@shutdown.make_true
end

.shutdown?Boolean

Returns [true|false] whether the shutdown indicator has been set for this server process

Returns:

  • (Boolean)


161
162
163
# File 'lib/rocket_job/server.rb', line 161

def self.shutdown?
  @@shutdown.value
end

.stop_allObject

Stop all running, paused, or starting servers



109
110
111
# File 'lib/rocket_job/server.rb', line 109

def self.stop_all
  where(:state.in => [:running, :paused, :starting]).each(&:stop!)
end

.zombies(missed = 4) ⇒ Object

Scope for all zombie servers



205
206
207
208
209
210
211
212
213
214
215
# File 'lib/rocket_job/server.rb', line 205

def self.zombies(missed = 4)
  dead_seconds        = Config.instance.heartbeat_seconds * missed
  last_heartbeat_time = Time.now - dead_seconds
  where(
    :state.in => [:stopping, :running, :paused],
    '$or'     => [
      {"heartbeat.updated_at" => {'$exists' => false}},
      {"heartbeat.updated_at" => {'$lte' => last_heartbeat_time}}
    ]
  )
end

Instance Method Details

#shutdown?Boolean

Returns [Boolean] whether the server is shutting down

Returns:

  • (Boolean)


200
201
202
# File 'lib/rocket_job/server.rb', line 200

def shutdown?
  self.class.shutdown? || !running?
end

#zombie?(missed = 4) ⇒ Boolean

Returns [true|false] if this server has missed at least the last 4 heartbeats

Possible causes for a server to miss its heartbeats:

  • The server process has died

  • The server process is “hanging”

  • The server is no longer able to communicate with the MongoDB Server

Returns:

  • (Boolean)


223
224
225
226
227
228
# File 'lib/rocket_job/server.rb', line 223

def zombie?(missed = 4)
  return false unless running? || stopping? || paused?
  return true if heartbeat.nil? || heartbeat.updated_at.nil?
  dead_seconds = Config.instance.heartbeat_seconds * missed
  (Time.now - heartbeat.updated_at) >= dead_seconds
end