Class: ProcessBalancer::Manager

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/process_balancer/manager.rb

Overview

:nodoc:

Instance Method Summary collapse

Methods included from Util

#hostname, #identity, #logger, #redis, #start_thread, #watchdog

Constructor Details

#initialize(options) ⇒ Manager

Returns a new instance of Manager.



16
17
18
19
20
21
22
23
24
# File 'lib/process_balancer/manager.rb', line 16

def initialize(options)
  @options       = options
  @done          = false
  @process_index = Concurrent::AtomicFixnum.new(-1)
  @process_count = Concurrent::AtomicFixnum.new(0)
  @pool          = Concurrent::ThreadPoolExecutor.new(max_threads: @options[:max_threads], fallback_policy: :discard)

  setup_job_watchers
end

Instance Method Details

#process_countObject



26
27
28
# File 'lib/process_balancer/manager.rb', line 26

def process_count
  @process_count.value
end

#process_indexObject



30
31
32
33
# File 'lib/process_balancer/manager.rb', line 30

def process_index
  v = @process_index.value
  v == -1 ? nil : v
end

#quietObject



43
44
45
46
47
48
49
# File 'lib/process_balancer/manager.rb', line 43

def quiet
  return if @done

  @done = true

  update_jobs
end

#runObject



39
40
41
# File 'lib/process_balancer/manager.rb', line 39

def run
  @thread = start_thread('heartbeat', &method(:run_heartbeat))
end

#stopObject



51
52
53
54
55
56
57
58
59
# File 'lib/process_balancer/manager.rb', line 51

def stop
  quiet

  @pool.shutdown
  @pool.wait_for_termination(ProcessBalancer.options[:shutdown_timeout])
  @pool.kill

  clear_heartbeat
end

#stopping?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/process_balancer/manager.rb', line 61

def stopping?
  @done
end

#workers_for_job(job_id) ⇒ Object



35
36
37
# File 'lib/process_balancer/manager.rb', line 35

def workers_for_job(job_id)
  stopping? ? 0 : ProcessBalancer.scheduled_workers(job_id)
end