Class: Sidekiq::Manager

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/sidekiq/manager.rb

Overview

The Manager is the central coordination point in Sidekiq, controlling the lifecycle of the Processors.

Tasks:

  1. start: Spin up Processors.

  2. processor_died: Handle job failure, throw away Processor, create new one.

  3. quiet: shutdown idle Processors.

  4. stop: hard stop the Processors by deadline.

Note that only the last task requires its own Thread since it has to monitor the shutdown process. The other tasks are performed by other threads.

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#fire_event, #handle_exception, #hostname, #identity, #logger, #process_nonce, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(capsule) ⇒ Manager

Returns a new instance of Manager.

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/sidekiq/manager.rb', line 27

def initialize(capsule)
  @config = @capsule = capsule
  @count = capsule.concurrency
  raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

  @done = false
  @workers = Set.new
  @plock = Mutex.new
  @count.times do
    @workers << Processor.new(@config, &method(:processor_result))
  end
end

Instance Attribute Details

#capsuleObject (readonly)

Returns the value of attribute capsule.



25
26
27
# File 'lib/sidekiq/manager.rb', line 25

def capsule
  @capsule
end

#workersObject (readonly)

Returns the value of attribute workers.



24
25
26
# File 'lib/sidekiq/manager.rb', line 24

def workers
  @workers
end

Instance Method Details

#processor_result(processor, reason = nil) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/sidekiq/manager.rb', line 70

def processor_result(processor, reason = nil)
  @plock.synchronize do
    @workers.delete(processor)
    unless @done
      p = Processor.new(@config, &method(:processor_result))
      @workers << p
      p.start
    end
  end
end

#quietObject



44
45
46
47
48
49
50
# File 'lib/sidekiq/manager.rb', line 44

def quiet
  return if @done
  @done = true

  logger.info { "Terminating quiet threads for #{capsule.name} capsule" }
  @workers.each(&:terminate)
end

#startObject



40
41
42
# File 'lib/sidekiq/manager.rb', line 40

def start
  @workers.each(&:start)
end

#stop(deadline) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/sidekiq/manager.rb', line 52

def stop(deadline)
  quiet

  # some of the shutdown events can be async,
  # we don't have any way to know when they're done but
  # give them a little time to take effect
  sleep PAUSE_TIME
  return if @workers.empty?

  logger.info { "Pausing to allow jobs to finish..." }
  wait_for(deadline) { @workers.empty? }
  return if @workers.empty?

  hard_shutdown
ensure
  capsule.stop
end

#stopped?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/sidekiq/manager.rb', line 81

def stopped?
  @done
end