Class: Sidekiq::Manager

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/sidekiq/manager.rb

Overview

The Manager is the central coordination point in Sidekiq, controlling the lifecycle of the Processors.

Tasks:

  1. start: Spin up Processors.

  2. processor_died: Handle job failure, throw away Processor, create new one.

  3. quiet: shutdown idle Processors.

  4. stop: hard stop the Processors by deadline.

Note that only the last task requires its own Thread since it has to monitor the shutdown process. The other tasks are performed by other threads.

Constant Summary collapse

PAUSE_TIME =

hack for quicker development / testing environment #2774

$stdout.tty? ? 0.1 : 0.5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#fire_event, #hostname, #identity, #logger, #process_nonce, #redis, #safe_thread, #tid, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(options = {}) ⇒ Manager

Returns a new instance of Manager.

Raises:

  • (ArgumentError)


29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/sidekiq/manager.rb', line 29

def initialize(options = {})
  logger.debug { options.inspect }
  @options = options
  @count = options[:concurrency] || 10
  raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

  @done = false
  @workers = Set.new
  @count.times do
    @workers << Processor.new(self, options)
  end
  @plock = Mutex.new
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



27
28
29
# File 'lib/sidekiq/manager.rb', line 27

def options
  @options
end

#workersObject (readonly)

Returns the value of attribute workers.



26
27
28
# File 'lib/sidekiq/manager.rb', line 26

def workers
  @workers
end

Instance Method Details

#processor_died(processor, reason) ⇒ Object



89
90
91
92
93
94
95
96
97
98
# File 'lib/sidekiq/manager.rb', line 89

def processor_died(processor, reason)
  @plock.synchronize do
    @workers.delete(processor)
    unless @done
      p = Processor.new(self, options)
      @workers << p
      p.start
    end
  end
end

#processor_stopped(processor) ⇒ Object



83
84
85
86
87
# File 'lib/sidekiq/manager.rb', line 83

def processor_stopped(processor)
  @plock.synchronize do
    @workers.delete(processor)
  end
end

#quietObject



49
50
51
52
53
54
55
56
# File 'lib/sidekiq/manager.rb', line 49

def quiet
  return if @done
  @done = true

  logger.info { "Terminating quiet workers" }
  @workers.each { |x| x.terminate }
  fire_event(:quiet, reverse: true)
end

#startObject



43
44
45
46
47
# File 'lib/sidekiq/manager.rb', line 43

def start
  @workers.each do |x|
    x.start
  end
end

#stop(deadline) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/sidekiq/manager.rb', line 61

def stop(deadline)
  quiet
  fire_event(:shutdown, reverse: true)

  # some of the shutdown events can be async,
  # we don't have any way to know when they're done but
  # give them a little time to take effect
  sleep PAUSE_TIME
  return if @workers.empty?

  logger.info { "Pausing to allow workers to finish..." }
  remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  while remaining > PAUSE_TIME
    return if @workers.empty?
    sleep PAUSE_TIME
    remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  end
  return if @workers.empty?

  hard_shutdown
end

#stopped?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/sidekiq/manager.rb', line 100

def stopped?
  @done
end