Class: ConcurrentWorker::WorkerPool
- Inherits:
-
Array
- Object
- Array
- ConcurrentWorker::WorkerPool
- Defined in:
- lib/concurrent_worker.rb
Instance Method Summary collapse
- #add_callback(&callback) ⇒ Object
- #add_finished_callback(&callback) ⇒ Object
- #deploy_worker ⇒ Object
-
#initialize(*args, **options, &work_block) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
- #join ⇒ Object
- #need_new_worker? ⇒ Boolean
- #req(*args, &work_block) ⇒ Object
- #set_block(symbol, &block) ⇒ Object
- #set_rcv_thread ⇒ Object
- #set_snd_thread ⇒ Object
Constructor Details
#initialize(*args, **options, &work_block) ⇒ WorkerPool
Returns a new instance of WorkerPool.
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/concurrent_worker.rb', line 405 def initialize(*args, **, &work_block) @args = args = @max_num = [:pool_max] || 8 @set_blocks = [] if work_block @set_blocks.push([:work_block, work_block]) end @ready_queue = Queue.new @result_callbacks = [] @snd_queue_max = [:snd_queue_max]||2 @req_counter = RequestCounter.new @snd_queue = Queue.new @snd_thread = set_snd_thread @rcv_queue = Queue.new @rcv_thread = set_rcv_thread end |
Instance Method Details
#add_callback(&callback) ⇒ Object
429 430 431 432 |
# File 'lib/concurrent_worker.rb', line 429 def add_callback(&callback) raise "block is nil" unless callback @result_callbacks.push(callback) end |
#add_finished_callback(&callback) ⇒ Object
434 435 436 437 |
# File 'lib/concurrent_worker.rb', line 434 def add_finished_callback(&callback) raise "block is nil" unless callback @finished_callbacks.push(callback) end |
#deploy_worker ⇒ Object
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'lib/concurrent_worker.rb', line 440 def deploy_worker defined?(@work_block) || @work_block = nil = { type: [:type], snd_queue_max: @snd_queue_max, result_callback_interrupt: :never, retired_callback_interrupt: :never } w = Worker.new(*@args, , &@work_block) w.add_callback do |*arg| @rcv_queue.push(arg) @ready_queue.push(w) end w.add_retired_callback do w.req_counter.rest.each do |req| @snd_queue.push(req) end @ready_queue.push(w) end @set_blocks.each do |symbol, block| w.set_block(symbol, &block) end w.run self.push(w) w end |
#join ⇒ Object
480 481 482 483 484 485 486 487 |
# File 'lib/concurrent_worker.rb', line 480 def join @req_counter.wait_until_less_than(1) self.shift.join until self.empty? @rcv_queue.push([]) @rcv_thread.join @snd_queue.push([]) @snd_thread.join end |
#need_new_worker? ⇒ Boolean
371 372 373 |
# File 'lib/concurrent_worker.rb', line 371 def need_new_worker? self.size < @max_num && self.select{ |w| w.queue_empty? }.empty? end |
#req(*args, &work_block) ⇒ Object
474 475 476 477 478 |
# File 'lib/concurrent_worker.rb', line 474 def req(*args, &work_block) @req_counter.wait_until_less_than(@max_num * @snd_queue_max) @req_counter.push(true) @snd_queue.push([args, work_block]) end |
#set_block(symbol, &block) ⇒ Object
470 471 472 |
# File 'lib/concurrent_worker.rb', line 470 def set_block(symbol, &block) @set_blocks.push([symbol, block]) end |
#set_rcv_thread ⇒ Object
393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/concurrent_worker.rb', line 393 def set_rcv_thread Thread.new do loop do break if (result = @rcv_queue.pop).empty? @result_callbacks.each do |callback| callback.call(*result) end @req_counter.pop end end end |
#set_snd_thread ⇒ Object
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/concurrent_worker.rb', line 375 def set_snd_thread Thread.new do loop do break if (req = @snd_queue.pop).empty? loop do delete_if{ |w| w.queue_closed? } if need_new_worker? w = deploy_worker w.snd_queue_max.times do @ready_queue.push(w) end end break if @ready_queue.pop.req(*req[0], &req[1]) end end end end |