Class: RocketJob::WorkerPool

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/rocket_job/worker_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server_name) ⇒ WorkerPool

Returns a new instance of WorkerPool.



10
11
12
13
14
# File 'lib/rocket_job/worker_pool.rb', line 10

def initialize(server_name)
  @server_name = server_name
  @workers     = Concurrent::Array.new
  @worker_id   = 0
end

Instance Attribute Details

#server_nameObject (readonly)

Returns the value of attribute server_name.



8
9
10
# File 'lib/rocket_job/worker_pool.rb', line 8

def server_name
  @server_name
end

#workersObject (readonly)

Returns the value of attribute workers.



8
9
10
# File 'lib/rocket_job/worker_pool.rb', line 8

def workers
  @workers
end

Instance Method Details

#find(id) ⇒ Object

Find a worker in the list by its id



17
18
19
# File 'lib/rocket_job/worker_pool.rb', line 17

def find(id)
  workers.find { |worker| worker.id == id }
end

#join(timeout = 5) ⇒ Object

Wait for all workers to stop. Return [true] if all workers stopped Return [false] on timeout



70
71
72
73
74
75
76
77
78
# File 'lib/rocket_job/worker_pool.rb', line 70

def join(timeout = 5)
  while (worker = workers.first)
    return false unless worker.join(timeout)

    # Worker thread is dead
    workers.shift
  end
  true
end

#killObject

Kill Worker threads



62
63
64
65
# File 'lib/rocket_job/worker_pool.rb', line 62

def kill
  workers.each(&:kill)
  workers.clear
end

#living_countObject

Returns [Integer] number of workers (threads) that are alive



81
82
83
# File 'lib/rocket_job/worker_pool.rb', line 81

def living_count
  workers.count(&:alive?)
end

#log_backtracesObject



85
86
87
# File 'lib/rocket_job/worker_pool.rb', line 85

def log_backtraces
  workers.each { |worker| logger.backtrace(thread: worker.thread) if worker.thread && worker.alive? }
end

#pruneObject

Returns [Integer] number of dead workers removed.



47
48
49
50
51
52
53
54
# File 'lib/rocket_job/worker_pool.rb', line 47

def prune
  remove_count = workers.count - living_count
  return 0 if remove_count.zero?

  logger.info "Cleaned up #{remove_count} dead workers"
  workers.delete_if { |t| !t.alive? }
  remove_count
end

#rebalance(max_workers, stagger_start = false) ⇒ Object

Add new workers to get back to the ‘max_workers` if not already at `max_workers`

Parameters
  stagger_start
    Whether to stagger when the workers poll for work the first time.
    It spreads out the queue polling over the max_poll_seconds so
    that not all workers poll at the same time.
    The worker also responds faster than max_poll_seconds when a new job is created.


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rocket_job/worker_pool.rb', line 28

def rebalance(max_workers, stagger_start = false)
  count = max_workers.to_i - living_count
  return 0 unless count.positive?

  logger.info("#{'Stagger ' if stagger_start}Starting #{count} workers")

  add_one
  count -= 1
  delay = Config.max_poll_seconds.to_f / max_workers

  count.times.each do
    sleep(delay) if stagger_start
    return -1 if Supervisor.shutdown?

    add_one
  end
end

#stopObject

Tell all workers to stop working.



57
58
59
# File 'lib/rocket_job/worker_pool.rb', line 57

def stop
  workers.each(&:shutdown!)
end