Class: Shoryuken::Manager

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/shoryuken/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#elapsed, #logger, #unparse_queues, #watchdog, #worker_name

Constructor Details

#initialize(condvar) ⇒ Manager

Returns a new instance of Manager.



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/shoryuken/manager.rb', line 13

def initialize(condvar)
  @count  = Shoryuken.options[:concurrency] || 25
  @queues = Shoryuken.queues.dup.uniq
  @finished = condvar

  @done = false

  @busy  = []
  @ready = @count.times.map { build_processor }
  @threads = {}
end

Instance Attribute Details

#fetcherObject

Returns the value of attribute fetcher.



9
10
11
# File 'lib/shoryuken/manager.rb', line 9

def fetcher
  @fetcher
end

Instance Method Details

#assign(queue, sqs_msg) ⇒ Object



86
87
88
89
90
91
92
93
94
95
# File 'lib/shoryuken/manager.rb', line 86

def assign(queue, sqs_msg)
  watchdog("Manager#assign died") do
    logger.info "Assigning #{sqs_msg.id}"

    processor = @ready.pop
    @busy << processor

    processor.async.process(queue, sqs_msg)
  end
end

#dispatchObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/shoryuken/manager.rb', line 118

def dispatch
  return if stopped?

  logger.debug { "Ready: #{@ready.size}, Busy: #{@busy.size}, Active Queues: #{unparse_queues(@queues)}" }

  if @ready.empty?
    logger.debug { 'Pausing fetcher, because all processors are busy' }

    after(1) { dispatch }

    return
  end

  if queue = next_queue
    @fetcher.async.fetch(queue, @ready.size)
  else
    logger.debug { 'Pausing fetcher, because all queues are paused' }

    @fetcher_paused = true
  end
end

#pause_queue!(queue) ⇒ Object



107
108
109
110
111
112
113
114
115
# File 'lib/shoryuken/manager.rb', line 107

def pause_queue!(queue)
  return if !@queues.include?(queue) || Shoryuken.options[:delay].to_f <= 0

  logger.debug "Pausing '#{queue}' for #{Shoryuken.options[:delay].to_f} seconds, because it's empty"

  @queues.delete(queue)

  after(Shoryuken.options[:delay].to_f) { async.restart_queue!(queue) }
end

#processor_died(processor, reason) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/shoryuken/manager.rb', line 69

def processor_died(processor, reason)
  watchdog("Manager#processor_died died") do
    logger.error "Process died, reason: #{reason}" unless reason.to_s.empty?

    @threads.delete(processor.object_id)
    @busy.delete processor

    unless stopped?
      @ready << build_processor
    end
  end
end

#processor_done(queue, processor) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/shoryuken/manager.rb', line 54

def processor_done(queue, processor)
  watchdog('Manager#processor_done died') do
    logger.info "Process done for '#{queue}'"

    @threads.delete(processor.object_id)
    @busy.delete processor

    if stopped?
      processor.terminate if processor.alive?
    else
      @ready << processor
    end
  end
end

#real_thread(proxy_id, thr) ⇒ Object



140
141
142
# File 'lib/shoryuken/manager.rb', line 140

def real_thread(proxy_id, thr)
  @threads[proxy_id] = thr
end

#rebalance_queue_weight!(queue) ⇒ Object



97
98
99
100
101
102
103
104
105
# File 'lib/shoryuken/manager.rb', line 97

def rebalance_queue_weight!(queue)
  watchdog('Manager#rebalance_queue_weight! died') do
    if (original = original_queue_weight(queue)) > (current = current_queue_weight(queue))
      logger.info "Increasing '#{queue}' weight to #{current + 1}, max: #{original}"

      @queues << queue
    end
  end
end

#startObject



25
26
27
28
29
# File 'lib/shoryuken/manager.rb', line 25

def start
  logger.info 'Starting'

  dispatch
end

#stop(options = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/shoryuken/manager.rb', line 31

def stop(options = {})
  watchdog('Manager#stop died') do
    @done = true

    @fetcher.terminate if @fetcher.alive?

    logger.info { "Shutting down #{@ready.size} quiet workers" }

    @ready.each do |processor|
      processor.terminate if processor.alive?
    end
    @ready.clear

    return after(0) { @finished.signal } if @busy.empty?

    if options[:shutdown]
      hard_shutdown_in(options[:timeout])
    else
      soft_shutdown(options[:timeout])
    end
  end
end

#stopped?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/shoryuken/manager.rb', line 82

def stopped?
  @done
end