Class: ConcurrentWorker::WorkerPool
- Inherits:
-
Array
- Object
- Array
- ConcurrentWorker::WorkerPool
- Defined in:
- lib/concurrent_worker/workerpool.rb
Instance Method Summary collapse
- #add_callback(&callback) ⇒ Object
- #add_finished_callback(&callback) ⇒ Object
- #available_worker ⇒ Object
- #deploy_worker ⇒ Object
-
#initialize(*args, **options, &work_block) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
- #join ⇒ Object
- #need_new_worker? ⇒ Boolean
- #req(*args, &work_block) ⇒ Object
- #set_block(symbol, &block) ⇒ Object
- #set_recv_thread ⇒ Object
- #set_snd_thread ⇒ Object
Constructor Details
#initialize(*args, **options, &work_block) ⇒ WorkerPool
Returns a new instance of WorkerPool.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/concurrent_worker/workerpool.rb', line 40 def initialize(*args, **, &work_block) @args = args = @max_num = [:pool_max] || 8 @set_blocks = [] if work_block @set_blocks.push([:work_block, work_block]) end @ready_queue = Queue.new @result_callbacks = [] @snd_queue_max = [:snd_queue_max]||2 @req_counter = RequestCounter.new @snd_queue = Queue.new @snd_thread = set_snd_thread @recv_queue = Queue.new @recv_thread = set_recv_thread end |
Instance Method Details
#add_callback(&callback) ⇒ Object
64 65 66 67 |
# File 'lib/concurrent_worker/workerpool.rb', line 64 def add_callback(&callback) raise "block is nil" unless callback @result_callbacks.push(callback) end |
#add_finished_callback(&callback) ⇒ Object
69 70 71 72 |
# File 'lib/concurrent_worker/workerpool.rb', line 69 def add_finished_callback(&callback) raise "block is nil" unless callback @finished_callbacks.push(callback) end |
#available_worker ⇒ Object
8 9 10 11 12 13 14 15 16 17 |
# File 'lib/concurrent_worker/workerpool.rb', line 8 def available_worker delete_if{ |w| w.queue_closed? } if need_new_worker? w = deploy_worker w.snd_queue_max.times do @ready_queue.push(w) end end @ready_queue.pop end |
#deploy_worker ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/concurrent_worker/workerpool.rb', line 75 def deploy_worker defined?(@work_block) || @work_block = nil = { type: [:type], snd_queue_max: @snd_queue_max, result_callback_interrupt: :never, retired_callback_interrupt: :never } w = Worker.new(*@args, , &@work_block) w.add_callback do |*args| @recv_queue.push([args]) @ready_queue.push(w) end w.add_retired_callback do w.req_counter.rest.each do |req| @snd_queue.push(req) end @ready_queue.push(w) end @set_blocks.each do |symbol, block| w.set_block(symbol, &block) end w.run self.push(w) w end |
#join ⇒ Object
115 116 117 118 119 120 121 122 |
# File 'lib/concurrent_worker/workerpool.rb', line 115 def join @req_counter.wait_until_less_than(1) self.shift.join until self.empty? @recv_queue.push(nil) @recv_thread.join @snd_queue.push(nil) @snd_thread.join end |
#need_new_worker? ⇒ Boolean
4 5 6 |
# File 'lib/concurrent_worker/workerpool.rb', line 4 def need_new_worker? self.size < @max_num && self.select{ |w| w.queue_empty? }.empty? end |
#req(*args, &work_block) ⇒ Object
109 110 111 112 113 |
# File 'lib/concurrent_worker/workerpool.rb', line 109 def req(*args, &work_block) @req_counter.wait_until_less_than(@max_num * @snd_queue_max) @req_counter.push(true) @snd_queue.push([args, work_block]) end |
#set_block(symbol, &block) ⇒ Object
105 106 107 |
# File 'lib/concurrent_worker/workerpool.rb', line 105 def set_block(symbol, &block) @set_blocks.push([symbol, block]) end |
#set_recv_thread ⇒ Object
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/concurrent_worker/workerpool.rb', line 29 def set_recv_thread Thread.new do while result = @recv_queue.pop @result_callbacks.each do |callback| callback.call(*result[0]) end @req_counter.pop end end end |
#set_snd_thread ⇒ Object
19 20 21 22 23 24 25 26 27 |
# File 'lib/concurrent_worker/workerpool.rb', line 19 def set_snd_thread Thread.new do while req = @snd_queue.pop (args, work_block) = req until available_worker.req(*args, &work_block) end end end end |