Class: Shoryuken::Manager

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/shoryuken/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #watchdog, #worker_name

Constructor Details

#initialize(condvar) ⇒ Manager

Returns a new instance of Manager


13
14
15
16
17
18
19
20
21
22
23
# File 'lib/shoryuken/manager.rb', line 13

def initialize(condvar)
  @count  = Shoryuken.options[:concurrency] || 25
  @queues = Shoryuken.queues.dup.uniq
  @finished = condvar

  @done = false

  @busy  = []
  @ready = @count.times.map { build_processor }
  @threads = {}
end

Instance Attribute Details

#fetcherObject

Returns the value of attribute fetcher


9
10
11
# File 'lib/shoryuken/manager.rb', line 9

def fetcher
  @fetcher
end

Instance Method Details

#assign(queue, sqs_msg) ⇒ Object


93
94
95
96
97
98
99
100
101
102
# File 'lib/shoryuken/manager.rb', line 93

def assign(queue, sqs_msg)
  watchdog('Manager#assign died') do
    logger.debug { "Assigning #{sqs_msg.message_id}" }

    processor = @ready.pop
    @busy << processor

    processor.async.process(queue, sqs_msg)
  end
end

#dispatchObject


125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/shoryuken/manager.rb', line 125

def dispatch
  return if stopped?

  logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{unparse_queues(@queues)}" }

  if @ready.empty?
    logger.debug { 'Pausing fetcher, because all processors are busy' }

    after(1) { dispatch }

    return
  end

  if (queue = next_queue)
    @fetcher.async.fetch(queue, @ready.size)
  else
    logger.debug { 'Pausing fetcher, because all queues are paused' }

    @fetcher_paused = true
  end
end

#pause_queue!(queue) ⇒ Object


114
115
116
117
118
119
120
121
122
# File 'lib/shoryuken/manager.rb', line 114

def pause_queue!(queue)
  return if !@queues.include?(queue) || Shoryuken.options[:delay].to_f <= 0

  logger.debug { "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because it's empty" }

  @queues.delete(queue)

  after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
end

#processor_died(processor, reason) ⇒ Object


76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/shoryuken/manager.rb', line 76

def processor_died(processor, reason)
  watchdog("Manager#processor_died died") do
    logger.error { "Process died, reason: #{reason}" unless reason.to_s.empty? }

    @threads.delete(processor.object_id)
    @busy.delete processor

    unless stopped?
      @ready << build_processor
    end
  end
end

#processor_done(queue, processor) ⇒ Object


61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/shoryuken/manager.rb', line 61

def processor_done(queue, processor)
  watchdog('Manager#processor_done died') do
    logger.debug { "Process done for '#{queue}'" }

    @threads.delete(processor.object_id)
    @busy.delete processor

    if stopped?
      processor.terminate if processor.alive?
    else
      @ready << processor
    end
  end
end

#real_thread(proxy_id, thr) ⇒ Object


147
148
149
# File 'lib/shoryuken/manager.rb', line 147

def real_thread(proxy_id, thr)
  @threads[proxy_id] = thr
end

#rebalance_queue_weight!(queue) ⇒ Object


104
105
106
107
108
109
110
111
112
# File 'lib/shoryuken/manager.rb', line 104

def rebalance_queue_weight!(queue)
  watchdog('Manager#rebalance_queue_weight! died') do
    if (original = original_queue_weight(queue)) > (current = current_queue_weight(queue))
      logger.info { "Increasing '#{queue}' weight to #{current + 1}, max: #{original}" }

      @queues << queue
    end
  end
end

#startObject


25
26
27
28
29
# File 'lib/shoryuken/manager.rb', line 25

def start
  logger.info { 'Starting' }

  dispatch
end

#stop(options = {}) ⇒ Object


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/shoryuken/manager.rb', line 31

def stop(options = {})
  watchdog('Manager#stop died') do
    @done = true

    if (callback = Shoryuken.stop_callback)
      logger.info { 'Calling Shoryuken.on_stop block' }
      callback.call
    end

    fire_event(:shutdown, true)

    @fetcher.terminate if @fetcher.alive?

    logger.info { "Shutting down #{@ready.size} quiet workers" }

    @ready.each do |processor|
      processor.terminate if processor.alive?
    end
    @ready.clear

    return after(0) { @finished.signal } if @busy.empty?

    if options[:shutdown]
      hard_shutdown_in(options[:timeout])
    else
      soft_shutdown(options[:timeout])
    end
  end
end

#stopped?Boolean

Returns:

  • (Boolean)

89
90
91
# File 'lib/shoryuken/manager.rb', line 89

def stopped?
  @done
end