Class: Dynflow::Executors::Parallel::Pool

Inherits:
Actor
  • Object
show all
Defined in:
lib/dynflow/executors/parallel/pool.rb

Defined Under Namespace

Classes: JobStorage

Instance Method Summary collapse

Methods inherited from Actor

#behaviour_definition, #finish_termination, #terminating?

Methods included from MethodicActor

#on_message

Methods included from Actor::LogWithFullBacktrace

#log

Constructor Details

#initialize(world, core, name, pool_size, transaction_adapter) ⇒ Pool

Returns a new instance of Pool.



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/dynflow/executors/parallel/pool.rb', line 38

def initialize(world, core, name, pool_size, transaction_adapter)
  @world = world
  @name = name
  @executor_core = core
  @pool_size     = pool_size
  @jobs          = JobStorage.new
  @free_workers  = Array.new(pool_size) do |i|
    name = "worker-#{i}"
    Worker.spawn(name, reference, transaction_adapter, telemetry_options.merge(:worker => name))
  end
end

Instance Method Details

#execution_status(execution_plan_id = nil) ⇒ Object



75
76
77
78
79
# File 'lib/dynflow/executors/parallel/pool.rb', line 75

def execution_status(execution_plan_id = nil)
  { :pool_size => @pool_size,
    :free_workers => @free_workers.count,
    :queue_size => @jobs.queue_size(execution_plan_id) }
end

#handle_persistence_error(worker, error, work = nil) ⇒ Object



64
65
66
67
68
# File 'lib/dynflow/executors/parallel/pool.rb', line 64

def handle_persistence_error(worker, error, work = nil)
  @executor_core.tell([:handle_persistence_error, error, work])
  @free_workers << worker
  distribute_jobs
end

#schedule_work(work) ⇒ Object



50
51
52
53
54
# File 'lib/dynflow/executors/parallel/pool.rb', line 50

def schedule_work(work)
  @jobs.add work
  distribute_jobs
  update_telemetry
end

#start_termination(*args) ⇒ Object



70
71
72
73
# File 'lib/dynflow/executors/parallel/pool.rb', line 70

def start_termination(*args)
  super
  try_to_terminate
end

#worker_done(worker, work) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/dynflow/executors/parallel/pool.rb', line 56

def worker_done(worker, work)
  step = work.step if work.is_a?(Director::StepWorkItem)
  @executor_core.tell([:work_finished, work, step && step.delayed_events])
  @free_workers << worker
  Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, -1, telemetry_options) }
  distribute_jobs
end