Class: ConcurrentWorker::WorkerPool

Inherits:
Array
  • Object
show all
Defined in:
lib/concurrent_worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(*args, **options, &work_block) ⇒ WorkerPool

Returns a new instance of WorkerPool.



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/concurrent_worker.rb', line 405

def initialize(*args, **options, &work_block)
  @args = args
  
  @options = options
  @max_num = @options[:pool_max] || 8
  @set_blocks = []
  if work_block
    @set_blocks.push([:work_block, work_block])
  end

  @ready_queue = Queue.new
  
  @result_callbacks = []
  
  @snd_queue_max = @options[:snd_queue_max]||2
  @req_counter = RequestCounter.new

  @snd_queue = Queue.new
  @snd_thread = set_snd_thread
  
  @rcv_queue = Queue.new
  @rcv_thread = set_rcv_thread
end

Instance Method Details

#add_callback(&callback) ⇒ Object



429
430
431
432
# File 'lib/concurrent_worker.rb', line 429

def add_callback(&callback)
  raise "block is nil" unless callback
  @result_callbacks.push(callback)
end

#add_finished_callback(&callback) ⇒ Object



434
435
436
437
# File 'lib/concurrent_worker.rb', line 434

def add_finished_callback(&callback)
  raise "block is nil" unless callback
  @finished_callbacks.push(callback)
end

#deploy_workerObject



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'lib/concurrent_worker.rb', line 440

def deploy_worker
  defined?(@work_block) || @work_block = nil
  worker_options = {
    type:                       @options[:type],
    snd_queue_max:              @snd_queue_max,
    result_callback_interrupt:  :never,
    retired_callback_interrupt: :never
  }
  w = Worker.new(*@args, worker_options, &@work_block)
  w.add_callback do |*arg|
    @rcv_queue.push(arg)
    @ready_queue.push(w)
  end

  w.add_retired_callback do
    w.req_counter.rest.each do
      |req|
      @snd_queue.push(req)
    end
    @ready_queue.push(w)
  end
  
  @set_blocks.each do |symbol, block|
    w.set_block(symbol, &block)
  end
  w.run
  self.push(w)
  w
end

#joinObject



480
481
482
483
484
485
486
487
# File 'lib/concurrent_worker.rb', line 480

def join
  @req_counter.wait_until_less_than(1)
  self.shift.join until self.empty?
  @rcv_queue.push([])
  @rcv_thread.join
  @snd_queue.push([])
  @snd_thread.join
end

#need_new_worker?Boolean

Returns:

  • (Boolean)


371
372
373
# File 'lib/concurrent_worker.rb', line 371

def need_new_worker?
  self.size < @max_num && self.select{ |w| w.queue_empty? }.empty?
end

#req(*args, &work_block) ⇒ Object



474
475
476
477
478
# File 'lib/concurrent_worker.rb', line 474

def req(*args, &work_block)
  @req_counter.wait_until_less_than(@max_num * @snd_queue_max)
  @req_counter.push(true)
  @snd_queue.push([args, work_block])
end

#set_block(symbol, &block) ⇒ Object



470
471
472
# File 'lib/concurrent_worker.rb', line 470

def set_block(symbol, &block)
  @set_blocks.push([symbol, block])
end

#set_rcv_threadObject



393
394
395
396
397
398
399
400
401
402
403
# File 'lib/concurrent_worker.rb', line 393

def set_rcv_thread
  Thread.new do
    loop do
      break if (result = @rcv_queue.pop).empty?
      @result_callbacks.each do |callback|
        callback.call(*result)
      end
      @req_counter.pop
    end
  end
end

#set_snd_threadObject



375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/concurrent_worker.rb', line 375

def set_snd_thread
  Thread.new do
    loop do
      break if (req = @snd_queue.pop).empty?
      loop do
        delete_if{ |w| w.queue_closed? }
        if need_new_worker?
          w = deploy_worker
          w.snd_queue_max.times do
            @ready_queue.push(w)
          end
        end
        break if @ready_queue.pop.req(*req[0], &req[1])
      end
    end
  end
end