Class: Bunny::ConsumerWorkPool
- Inherits:
-
Object
- Object
- Bunny::ConsumerWorkPool
- Defined in:
- lib/bunny/consumer_work_pool.rb
Overview
Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels or threads.
Every channel its own consumer pool.
Instance Attribute Summary collapse
-
#abort_on_exception ⇒ Object
readonly
Returns the value of attribute abort_on_exception.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#threads ⇒ Object
readonly
API.
Instance Method Summary collapse
- #backlog ⇒ Object
- #busy? ⇒ Boolean
-
#initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) ⇒ ConsumerWorkPool
constructor
A new instance of ConsumerWorkPool.
- #join(timeout = nil) ⇒ Object
- #kill ⇒ Object
- #pause ⇒ Object
- #resume ⇒ Object
- #running? ⇒ Boolean
- #shutdown(wait_for_workers = false) ⇒ Object
- #start ⇒ Object
- #submit(callable = nil, &block) ⇒ Object
Constructor Details
#initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) ⇒ ConsumerWorkPool
Returns a new instance of ConsumerWorkPool.
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/bunny/consumer_work_pool.rb', line 22 def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) @size = size @abort_on_exception = abort_on_exception @shutdown_timeout = shutdown_timeout @shutdown_mutex = ::Mutex.new @shutdown_conditional = ::ConditionVariable.new @queue = ::Queue.new @paused = false @running = false end |
Instance Attribute Details
#abort_on_exception ⇒ Object (readonly)
Returns the value of attribute abort_on_exception.
20 21 22 |
# File 'lib/bunny/consumer_work_pool.rb', line 20 def abort_on_exception @abort_on_exception end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
19 20 21 |
# File 'lib/bunny/consumer_work_pool.rb', line 19 def size @size end |
#threads ⇒ Object (readonly)
API
18 19 20 |
# File 'lib/bunny/consumer_work_pool.rb', line 18 def threads @threads end |
Instance Method Details
#backlog ⇒ Object
54 55 56 |
# File 'lib/bunny/consumer_work_pool.rb', line 54 def backlog @queue.length end |
#busy? ⇒ Boolean
58 59 60 |
# File 'lib/bunny/consumer_work_pool.rb', line 58 def busy? !@queue.empty? end |
#join(timeout = nil) ⇒ Object
79 80 81 |
# File 'lib/bunny/consumer_work_pool.rb', line 79 def join(timeout = nil) (@threads || []).each { |t| t.join(timeout) } end |
#kill ⇒ Object
95 96 97 98 99 |
# File 'lib/bunny/consumer_work_pool.rb', line 95 def kill @running = false (@threads || []).each { |t| t.kill } end |
#pause ⇒ Object
83 84 85 86 |
# File 'lib/bunny/consumer_work_pool.rb', line 83 def pause @running = false @paused = true end |
#resume ⇒ Object
88 89 90 91 92 93 |
# File 'lib/bunny/consumer_work_pool.rb', line 88 def resume @running = true @paused = false @threads.each { |t| t.run } end |
#running? ⇒ Boolean
50 51 52 |
# File 'lib/bunny/consumer_work_pool.rb', line 50 def running? @running end |
#shutdown(wait_for_workers = false) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/bunny/consumer_work_pool.rb', line 62 def shutdown(wait_for_workers = false) was_running = running? @running = false @size.times do submit do |*args| throw :terminate end end return if !(wait_for_workers && @shutdown_timeout && was_running) @shutdown_mutex.synchronize do @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout) if busy? end end |
#start ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/bunny/consumer_work_pool.rb', line 38 def start @threads = [] @size.times do t = Thread.new(&method(:run_loop)) t.abort_on_exception = true if abort_on_exception @threads << t end @running = true end |
#submit(callable = nil, &block) ⇒ Object
34 35 36 |
# File 'lib/bunny/consumer_work_pool.rb', line 34 def submit(callable = nil, &block) @queue.push(callable || block) end |