Class: Dynflow::Director
- Inherits:
-
Object
- Object
- Dynflow::Director
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/director.rb,
lib/dynflow/director/queue_hash.rb,
lib/dynflow/director/flow_manager.rb,
lib/dynflow/director/sequence_cursor.rb,
lib/dynflow/director/sequential_manager.rb,
lib/dynflow/director/running_steps_manager.rb,
lib/dynflow/director/execution_plan_manager.rb
Overview
Director is responsible for telling what to do next when:
* new execution starts
* an event accurs
* some work is finished
It’s public methods (except terminate) return work items that the executor should understand
Defined Under Namespace
Classes: EventWorkItem, ExecutionPlanManager, FinalizeWorkItem, FlowManager, PlanningWorkItem, QueueHash, RunningStepsManager, SequenceCursor, SequentialManager, StepWorkItem, WorkItem
Constant Summary collapse
- Event =
Algebrick.type do fields! request_id: String, execution_plan_id: String, step_id: Integer, event: Object, result: Concurrent::Promises::ResolvableFuture, optional: Algebrick::Types::Boolean end
- UnprocessableEvent =
Class.new(Dynflow::Error)
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
- #current_execution_plan_ids ⇒ Object
- #handle_event(event) ⇒ Object
- #handle_planning(execution_plan_uuid) ⇒ Object
-
#initialize(world) ⇒ Director
constructor
A new instance of Director.
- #start_execution(execution_plan_id, finished) ⇒ Object
- #terminate ⇒ Object
-
#work_failed(work) ⇒ Object
called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user).
- #work_finished(work) ⇒ Object
Constructor Details
#initialize(world) ⇒ Director
Returns a new instance of Director.
166 167 168 169 170 171 172 |
# File 'lib/dynflow/director.rb', line 166 def initialize(world) @world = world @logger = world.logger @execution_plan_managers = {} @rescued_steps = {} @planning_plans = Set.new end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
164 165 166 |
# File 'lib/dynflow/director.rb', line 164 def logger @logger end |
Instance Method Details
#current_execution_plan_ids ⇒ Object
174 175 176 |
# File 'lib/dynflow/director.rb', line 174 def current_execution_plan_ids @execution_plan_managers.keys end |
#handle_event(event) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/dynflow/director.rb', line 191 def handle_event(event) Type! event, Event execution_plan_manager = @execution_plan_managers[event.execution_plan_id] if execution_plan_manager execution_plan_manager.event(event) elsif event.optional event.result.reject "no manager for #{event.inspect}" [] else raise Dynflow::Error, "no manager for #{event.inspect}" end rescue Dynflow::Error => e event.result.reject e. raise e end |
#handle_planning(execution_plan_uuid) ⇒ Object
178 179 180 181 182 183 |
# File 'lib/dynflow/director.rb', line 178 def handle_planning(execution_plan_uuid) return [] if @planning_plans.include? execution_plan_uuid @planning_plans << execution_plan_uuid [PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)] end |
#start_execution(execution_plan_id, finished) ⇒ Object
185 186 187 188 189 |
# File 'lib/dynflow/director.rb', line 185 def start_execution(execution_plan_id, finished) manager = track_execution_plan(execution_plan_id, finished) return [] unless manager unless_done(manager, manager.start) end |
#terminate ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/dynflow/director.rb', line 233 def terminate unless @execution_plan_managers.empty? logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..." begin @execution_plan_managers.values.each do |manager| manager.terminate end rescue Errors::PersistenceError logger.error "could not to clean the data properly" end @execution_plan_managers.values.each do |manager| finish_manager(manager) end end end |
#work_failed(work) ⇒ Object
called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)
224 225 226 227 228 229 230 231 |
# File 'lib/dynflow/director.rb', line 224 def work_failed(work) if (manager = @execution_plan_managers[work.execution_plan_id]) manager.terminate # Don't try to store when the execution plan went missing plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty? finish_manager(manager, store: !plan_missing) end end |
#work_finished(work) ⇒ Object
207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/dynflow/director.rb', line 207 def work_finished(work) case work when PlanningWorkItem @planning_plans.delete(work.execution_plan_id) @world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id) [] else manager = @execution_plan_managers[work.execution_plan_id] return [] unless manager # skip case when getting event from execution plan that is not running anymore unless_done(manager, manager.what_is_next(work)) end end |