Class: Dynflow::Director
- Inherits:
-
Object
- Object
- Dynflow::Director
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/director.rb,
lib/dynflow/director/queue_hash.rb,
lib/dynflow/director/flow_manager.rb,
lib/dynflow/director/sequence_cursor.rb,
lib/dynflow/director/sequential_manager.rb,
lib/dynflow/director/running_steps_manager.rb,
lib/dynflow/director/execution_plan_manager.rb
Overview
Director is responsible for telling what to do next when:
* new execution starts
* an event accurs
* some work is finished
It’s public methods (except terminate) return work items that the executor should understand
Defined Under Namespace
Classes: EventWorkItem, ExecutionPlanManager, FinalizeWorkItem, FlowManager, PlanningWorkItem, QueueHash, RunningStepsManager, SequenceCursor, SequentialManager, StepWorkItem, WorkItem
Constant Summary collapse
- Event =
Algebrick.type do fields! request_id: String, execution_plan_id: String, step_id: Integer, event: Object, result: Concurrent::Promises::ResolvableFuture, optional: Algebrick::Types::Boolean end
- UnprocessableEvent =
Class.new(Dynflow::Error)
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
- #current_execution_plan_ids ⇒ Object
- #handle_event(event) ⇒ Object
- #handle_planning(execution_plan_uuid) ⇒ Object
-
#initialize(world) ⇒ Director
constructor
A new instance of Director.
- #start_execution(execution_plan_id, finished) ⇒ Object
- #terminate ⇒ Object
-
#work_failed(work) ⇒ Object
called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user).
- #work_finished(work) ⇒ Object
Constructor Details
#initialize(world) ⇒ Director
Returns a new instance of Director.
165 166 167 168 169 170 171 |
# File 'lib/dynflow/director.rb', line 165 def initialize(world) @world = world @logger = world.logger @execution_plan_managers = {} @rescued_steps = {} @planning_plans = Set.new end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
163 164 165 |
# File 'lib/dynflow/director.rb', line 163 def logger @logger end |
Instance Method Details
#current_execution_plan_ids ⇒ Object
173 174 175 |
# File 'lib/dynflow/director.rb', line 173 def current_execution_plan_ids @execution_plan_managers.keys end |
#handle_event(event) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/dynflow/director.rb', line 190 def handle_event(event) Type! event, Event execution_plan_manager = @execution_plan_managers[event.execution_plan_id] if execution_plan_manager execution_plan_manager.event(event) elsif event.optional event.result.reject "no manager for #{event.inspect}" [] else raise Dynflow::Error, "no manager for #{event.inspect}" end rescue Dynflow::Error => e event.result.reject e. raise e end |
#handle_planning(execution_plan_uuid) ⇒ Object
177 178 179 180 181 182 |
# File 'lib/dynflow/director.rb', line 177 def handle_planning(execution_plan_uuid) return [] if @planning_plans.include? execution_plan_uuid @planning_plans << execution_plan_uuid [PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)] end |
#start_execution(execution_plan_id, finished) ⇒ Object
184 185 186 187 188 |
# File 'lib/dynflow/director.rb', line 184 def start_execution(execution_plan_id, finished) manager = track_execution_plan(execution_plan_id, finished) return [] unless manager unless_done(manager, manager.start) end |
#terminate ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/dynflow/director.rb', line 232 def terminate unless @execution_plan_managers.empty? logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..." begin @execution_plan_managers.values.each do |manager| manager.terminate end rescue Errors::PersistenceError logger.error "could not to clean the data properly" end @execution_plan_managers.values.each do |manager| finish_manager(manager) end end end |
#work_failed(work) ⇒ Object
called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)
223 224 225 226 227 228 229 230 |
# File 'lib/dynflow/director.rb', line 223 def work_failed(work) if (manager = @execution_plan_managers[work.execution_plan_id]) manager.terminate # Don't try to store when the execution plan went missing plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty? finish_manager(manager, store: !plan_missing) end end |
#work_finished(work) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/dynflow/director.rb', line 206 def work_finished(work) case work when PlanningWorkItem @planning_plans.delete(work.execution_plan_id) @world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id) [] else manager = @execution_plan_managers[work.execution_plan_id] return [] unless manager # skip case when getting event from execution plan that is not running anymore unless_done(manager, manager.what_is_next(work)) end end |