Class: Dynflow::Executors::Parallel

Inherits:
Abstract
  • Object
show all
Defined in:
lib/dynflow/executors/parallel.rb,
lib/dynflow/executors/parallel/core.rb,
lib/dynflow/executors/parallel/pool.rb,
lib/dynflow/executors/parallel/worker.rb,
lib/dynflow/executors/parallel/work_queue.rb,
lib/dynflow/executors/parallel/flow_manager.rb,
lib/dynflow/executors/parallel/sequence_cursor.rb,
lib/dynflow/executors/parallel/running_steps_manager.rb,
lib/dynflow/executors/parallel/execution_plan_manager.rb

Defined Under Namespace

Classes: Core, ExecutionPlanManager, FlowManager, Pool, RunningStepsManager, SequenceCursor, SequentialManager, WorkQueue, Worker

Constant Summary collapse

UnprocessableEvent =
Class.new(Dynflow::Error)
PoolDone =
Algebrick.type { fields! work: Work }
PoolTerminated =
Algebrick.atom
WorkerDone =
Algebrick.type { fields! work: Work, worker: Worker }

Constants inherited from Abstract

Abstract::Event, Abstract::Execution

Instance Attribute Summary

Attributes inherited from Abstract

#logger, #world

Instance Method Summary collapse

Constructor Details

#initialize(world, pool_size = 10) ⇒ Parallel

Returns a new instance of Parallel.



43
44
45
46
# File 'lib/dynflow/executors/parallel.rb', line 43

def initialize(world, pool_size = 10)
  super(world)
  @core = Core.new world, pool_size
end

Instance Method Details

#event(execution_plan_id, step_id, event, future = Future.new) ⇒ Object



56
57
58
59
# File 'lib/dynflow/executors/parallel.rb', line 56

def event(execution_plan_id, step_id, event, future = Future.new)
  @core << Event[execution_plan_id, step_id, event, future]
  future
end

#execute(execution_plan_id, finished = Future.new) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/dynflow/executors/parallel.rb', line 48

def execute(execution_plan_id, finished = Future.new)
  @core.ask(Execution[execution_plan_id, finished]).value!
  finished
rescue => e
  finished.fail e unless finished.ready?
  raise e
end

#initializedObject



65
66
67
# File 'lib/dynflow/executors/parallel.rb', line 65

def initialized
  @core.initialized
end

#terminate(future = Future.new) ⇒ Object



61
62
63
# File 'lib/dynflow/executors/parallel.rb', line 61

def terminate(future = Future.new)
  @core.ask(MicroActor::Terminate, future)
end