Class: Shoryuken::Manager
- Inherits:
-
Object
- Object
- Shoryuken::Manager
show all
- Includes:
- Util
- Defined in:
- lib/shoryuken/manager.rb
Constant Summary
collapse
- BATCH_LIMIT =
10
- MIN_DISPATCH_INTERVAL =
0.1
Instance Method Summary
collapse
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(fetcher, polling_strategy) ⇒ Manager
Returns a new instance of Manager.
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/shoryuken/manager.rb', line 9
def initialize(fetcher, polling_strategy)
@count = Shoryuken.options.fetch(:concurrency, 25)
raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0
@queues = Shoryuken.queues.dup.uniq
@done = Concurrent::AtomicBoolean.new(false)
@fetcher = fetcher
@polling_strategy = polling_strategy
@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count)
@dispatcher_executor = Concurrent::SingleThreadExecutor.new
end
|
Instance Method Details
#processor_done(queue) ⇒ Object
57
58
59
|
# File 'lib/shoryuken/manager.rb', line 57
def processor_done(queue)
logger.debug { "Process done for '#{queue}'" }
end
|
#processor_failed(ex) ⇒ Object
52
53
54
55
|
# File 'lib/shoryuken/manager.rb', line 52
def processor_failed(ex)
logger.error ex
logger.error ex.backtrace.join("\n") unless ex.backtrace.nil?
end
|
#start ⇒ Object
25
26
27
28
29
|
# File 'lib/shoryuken/manager.rb', line 25
def start
logger.info { 'Starting' }
dispatch_async
end
|
#stop(options = {}) ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/shoryuken/manager.rb', line 31
def stop(options = {})
@done.make_true
if (callback = Shoryuken.stop_callback)
logger.info { 'Calling Shoryuken.on_stop block' }
callback.call
end
fire_event(:shutdown, true)
logger.info { 'Shutting down workers' }
@dispatcher_executor.kill
if options[:shutdown]
hard_shutdown_in(options[:timeout])
else
soft_shutdown
end
end
|