Class: Dynflow::ExecutionPlan

Inherits:
Serializable show all
Includes:
Algebrick::TypeCheck, Stateful
Defined in:
lib/dynflow/execution_plan.rb,
lib/dynflow/execution_plan/hooks.rb

Overview

rubocop:disable Metrics/ClassLength TODO extract planning logic to an extra class ExecutionPlanner

Defined Under Namespace

Modules: Hooks, Steps Classes: DependencyGraph, InvalidPlan, OutputReference

Constant Summary

Constants inherited from Serializable

Serializable::LEGACY_TIME_FORMAT, Serializable::TIME_FORMAT

Instance Attribute Summary collapse

Attributes included from Stateful

#state

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Stateful

included, #set_state, #state_transitions, #states

Methods inherited from Serializable

constantize, from_hash

Constructor Details

#initialize(world, id = nil, label = nil, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0, execution_history = ExecutionHistory.new) ⇒ ExecutionPlan

all params with default values are part of private api



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/dynflow/execution_plan.rb', line 72

def initialize(world,
               id                = nil,
               label             = nil,
               state             = :pending,
               root_plan_step    = nil,
               run_flow          = Flows::Concurrence.new([]),
               finalize_flow     = Flows::Sequence.new([]),
               steps             = {},
               started_at        = nil,
               ended_at          = nil,
               execution_time    = nil,
               real_time         = 0.0,
               execution_history = ExecutionHistory.new)
  id ||= SecureRandom.uuid
  @id                = Type! id, String
  @world             = Type! world, World
  @label             = Type! label, String, NilClass
  self.state         = state
  @run_flow          = Type! run_flow, Flows::Abstract
  @finalize_flow     = Type! finalize_flow, Flows::Abstract
  @root_plan_step    = root_plan_step
  @started_at        = Type! started_at, Time, NilClass
  @ended_at          = Type! ended_at, Time, NilClass
  @execution_time    = Type! execution_time, Numeric, NilClass
  @real_time         = Type! real_time, Numeric
  @execution_history = Type! execution_history, ExecutionHistory

  steps.all? do |k, v|
    Type! k, Integer
    Type! v, Steps::Abstract
  end
  @steps = steps
end

Instance Attribute Details

#ended_atObject (readonly)

Returns the value of attribute ended_at.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def ended_at
  @ended_at
end

#execution_historyObject (readonly)

Returns the value of attribute execution_history.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def execution_history
  @execution_history
end

#execution_timeObject (readonly)

Returns the value of attribute execution_time.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def execution_time
  @execution_time
end

#finalize_flowObject (readonly)

Returns the value of attribute finalize_flow.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def finalize_flow
  @finalize_flow
end

#idObject (readonly)

Returns the value of attribute id.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def id
  @id
end

#labelObject (readonly)

Returns the value of attribute label.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def label
  @label
end

#real_timeObject (readonly)

Returns the value of attribute real_time.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def real_time
  @real_time
end

#root_plan_stepObject (readonly)

Returns the value of attribute root_plan_step.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def root_plan_step
  @root_plan_step
end

#run_flowObject (readonly)

Returns the value of attribute run_flow.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def run_flow
  @run_flow
end

#started_atObject (readonly)

Returns the value of attribute started_at.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def started_at
  @started_at
end

#stepsObject (readonly)

Returns the value of attribute steps.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def steps
  @steps
end

#worldObject (readonly)

Returns the value of attribute world.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def world
  @world
end

Class Method Details

.load_flow(flow_hash) ⇒ Object



424
425
426
427
428
429
430
# File 'lib/dynflow/execution_plan.rb', line 424

def self.load_flow(flow_hash)
  if flow_hash.is_a? Hash
    Flows::Abstract.from_hash(flow_hash)
  else
    Flows::Abstract.decode(flow_hash)
  end
end

.new_from_hash(hash, world) ⇒ Object



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'lib/dynflow/execution_plan.rb', line 453

def self.new_from_hash(hash, world)
  check_class_matching hash
  execution_plan_id = hash[:id]
  steps             = steps_from_hash(hash[:step_ids], execution_plan_id, world)
  self.new(world,
           execution_plan_id,
           hash[:label],
           hash[:state],
           steps[hash[:root_plan_step_id]],
           load_flow(hash[:run_flow]),
           load_flow(hash[:finalize_flow]),
           steps,
           string_to_time(hash[:started_at]),
           string_to_time(hash[:ended_at]),
           hash[:execution_time].to_f,
           hash[:real_time].to_f,
           ExecutionHistory.new_from_hash(hash[:execution_history]))
rescue => plan_exception
  begin
    world.logger.error("Could not load execution plan #{execution_plan_id}")
    world.logger.error(plan_exception)
    InvalidPlan.new(plan_exception, execution_plan_id,
                    hash[:label],
                    hash[:state],
                    string_to_time(hash[:started_at]),
                    string_to_time(hash[:ended_at]),
                    hash[:execution_time].to_f,
                    hash[:real_time].to_f,
                    ExecutionHistory.new_from_hash(hash[:execution_history]))
  rescue => invalid_plan_exception
    world.logger.error("Could not even load a fallback execution plan for #{execution_plan_id}")
    world.logger.error(invalid_plan_exception)
    InvalidPlan.new(invalid_plan_exception, execution_plan_id,
                    hash[:label],
                    hash[:state])
  end
end

.resultsObject



57
58
59
# File 'lib/dynflow/execution_plan.rb', line 57

def self.results
  @results ||= [:pending, :success, :warning, :error, :cancelled]
end

.state_transitionsObject



61
62
63
64
65
66
67
68
69
# File 'lib/dynflow/execution_plan.rb', line 61

def self.state_transitions
  @state_transitions ||= { pending:  [:stopped, :scheduled, :planning],
                           scheduled: [:planning, :stopped],
                           planning: [:planned, :stopped],
                           planned:  [:running, :stopped],
                           running:  [:paused, :stopped],
                           paused:   [:running, :stopped],
                           stopped:  [] }
end

.statesObject



51
52
53
# File 'lib/dynflow/execution_plan.rb', line 51

def self.states
  @states ||= [:pending, :scheduled, :planning, :planned, :running, :paused, :stopped]
end

Instance Method Details

#actionsArray<Action>

Returns actions in Present phase.

Returns:

  • (Array<Action>)

    actions in Present phase



515
516
517
518
519
# File 'lib/dynflow/execution_plan.rb', line 515

def actions
  @actions ||= begin
    [entry_action] + entry_action.all_planned_actions
  end
end

#add_finalize_step(action) ⇒ Object



417
418
419
420
421
422
# File 'lib/dynflow/execution_plan.rb', line 417

def add_finalize_step(action)
  add_step(Steps::FinalizeStep, action.class, action.id).tap do |step|
    step.update_from_action(action)
    finalize_flow << Flows::Atom.new(step.id)
  end
end

#add_plan_step(action_class, caller_action = nil) ⇒ Object



399
400
401
402
403
404
405
406
407
# File 'lib/dynflow/execution_plan.rb', line 399

def add_plan_step(action_class, caller_action = nil)
  add_step(Steps::PlanStep, action_class, generate_action_id).tap do |step|
    # TODO: to be removed and preferred by the caller_action
    if caller_action && caller_action.execution_plan_id == self.id
      @steps[caller_action.plan_step_id].children << step.id
    end
    step.initialize_action(caller_action)
  end
end

#add_run_step(action) ⇒ Object



409
410
411
412
413
414
415
# File 'lib/dynflow/execution_plan.rb', line 409

def add_run_step(action)
  add_step(Steps::RunStep, action.class, action.id).tap do |step|
    step.update_from_action(action)
    @dependency_graph.add_dependencies(step, action)
    current_run_flow.add_and_resolve(@dependency_graph, Flows::Atom.new(step.id))
  end
end

#add_scheduling_step(action_class, caller_action = nil) ⇒ Object



393
394
395
396
397
# File 'lib/dynflow/execution_plan.rb', line 393

def add_scheduling_step(action_class, caller_action = nil)
  add_step(Steps::PlanStep, action_class, generate_action_id, :scheduling).tap do |step|
    step.initialize_action(caller_action)
  end
end

#caller_execution_plan_idObject



521
522
523
# File 'lib/dynflow/execution_plan.rb', line 521

def caller_execution_plan_id
  entry_action.caller_execution_plan_id
end

#cancel(force = false) ⇒ Object

sends the cancel event to all currently running and cancellable steps. if the plan is just scheduled, it cancels it (and returns an one-item array with the future value of the cancel result)



309
310
311
312
313
314
315
316
317
318
# File 'lib/dynflow/execution_plan.rb', line 309

def cancel(force = false)
  if state == :scheduled
    [Concurrent::Promises.resolvable_future.tap { |f| f.fulfill delay_record.cancel }]
  else
    event = force ? ::Dynflow::Action::Cancellable::Abort : ::Dynflow::Action::Cancellable::Cancel
    steps_to_cancel.map do |step|
      world.event(id, step.id, event)
    end
  end
end

#cancellable?Boolean

Returns:

  • (Boolean)


320
321
322
323
324
# File 'lib/dynflow/execution_plan.rb', line 320

def cancellable?
  return true if state == :scheduled
  return false unless state == :running
  steps_to_cancel.any?
end

#compute_execution_timeObject



491
492
493
494
495
# File 'lib/dynflow/execution_plan.rb', line 491

def compute_execution_time
  self.steps.values.reduce(0) do |execution_time, step|
    execution_time + (step.execution_time || 0)
  end
end

#current_run_flowObject



369
370
371
# File 'lib/dynflow/execution_plan.rb', line 369

def current_run_flow
  @run_flow_stack.last
end

#delay(caller_action, action_class, delay_options, *args) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/dynflow/execution_plan.rb', line 254

def delay(caller_action, action_class, delay_options, *args)
  save
  @root_plan_step = add_scheduling_step(action_class, caller_action)
  run_hooks(:pending)
  serializer = root_plan_step.delay(delay_options, args)
  delayed_plan = DelayedPlan.new(@world,
                                 id,
                                 delay_options[:start_at],
                                 delay_options.fetch(:start_before, nil),
                                 serializer,
                                 delay_options[:frozen] || false)
  persistence.save_delayed_plan(delayed_plan)
ensure
  update_state(error? ? :stopped : :scheduled)
end

#delay_recordObject



270
271
272
# File 'lib/dynflow/execution_plan.rb', line 270

def delay_record
  @delay_record ||= persistence.load_delayed_plan(id)
end

#entry_actionObject



510
511
512
# File 'lib/dynflow/execution_plan.rb', line 510

def entry_action
  @entry_action ||= root_plan_step.action(self)
end

#error?Boolean

Returns:

  • (Boolean)


181
182
183
# File 'lib/dynflow/execution_plan.rb', line 181

def error?
  result == :error
end

#error_in_plan?Boolean

Returns:

  • (Boolean)


189
190
191
# File 'lib/dynflow/execution_plan.rb', line 189

def error_in_plan?
  steps_in_state(:error).any? { |step| step.is_a? Steps::PlanStep }
end

#errorsObject



193
194
195
# File 'lib/dynflow/execution_plan.rb', line 193

def errors
  steps.values.map(&:error).compact
end

#failed_stepsObject



236
237
238
# File 'lib/dynflow/execution_plan.rb', line 236

def failed_steps
  steps_in_state(:error)
end

#failure?Boolean

Returns:

  • (Boolean)


185
186
187
# File 'lib/dynflow/execution_plan.rb', line 185

def failure?
  [:error, :warning, :cancelled].include?(result)
end

#finalize_stepsObject



232
233
234
# File 'lib/dynflow/execution_plan.rb', line 232

def finalize_steps
  steps_of_type(Dynflow::ExecutionPlan::Steps::FinalizeStep)
end

#generate_action_idObject



244
245
246
247
# File 'lib/dynflow/execution_plan.rb', line 244

def generate_action_id
  @last_action_id ||= 0
  @last_action_id += 1
end

#generate_step_idObject



249
250
251
252
# File 'lib/dynflow/execution_plan.rb', line 249

def generate_step_id
  @last_step_id ||= 0
  @last_step_id += 1
end

#loggerObject



110
111
112
# File 'lib/dynflow/execution_plan.rb', line 110

def logger
  @world.logger
end

#plan(*args) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/dynflow/execution_plan.rb', line 285

def plan(*args)
  update_state(:planning)
  world.middleware.execute(:plan_phase, root_plan_step.action_class, self) do
    with_planning_scope do
      root_action = root_plan_step.execute(self, nil, false, *args)
      @label = root_action.label

      if @dependency_graph.unresolved?
        raise "Some dependencies were not resolved: #{@dependency_graph.inspect}"
      end
    end
  end

  if @run_flow.size == 1
    @run_flow = @run_flow.sub_flows.first
  end

  steps.values.each(&:save)
  update_state(error? ? :stopped : :planned)
end

#plan_stepsObject



224
225
226
# File 'lib/dynflow/execution_plan.rb', line 224

def plan_steps
  steps_of_type(Dynflow::ExecutionPlan::Steps::PlanStep)
end

#prepare(action_class, options = {}) ⇒ Object



274
275
276
277
278
279
280
281
282
283
# File 'lib/dynflow/execution_plan.rb', line 274

def prepare(action_class, options = {})
  options = options.dup
  caller_action = Type! options.delete(:caller_action), Dynflow::Action, NilClass
  raise "Unexpected options #{options.keys.inspect}" unless options.empty?
  save
  @root_plan_step = add_plan_step(action_class, caller_action)
  step = @root_plan_step.save
  run_hooks(:pending)
  step
end

#prepare_for_rescueObject



210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/dynflow/execution_plan.rb', line 210

def prepare_for_rescue
  case rescue_strategy
  when Action::Rescue::Pause
    :paused
  when Action::Rescue::Fail
    :stopped
  when Action::Rescue::Skip
    failed_steps.each { |step| self.skip(step) }
    :running
  else
    :paused
  end
end

#progress0..1

info

Returns:

  • (0..1)

    the percentage of the progress. See Action::Progress for more



499
500
501
502
503
504
505
506
507
508
# File 'lib/dynflow/execution_plan.rb', line 499

def progress
  return 0 if [:pending, :planning, :scheduled].include?(state)
  flow_step_ids         = run_flow.all_step_ids + finalize_flow.all_step_ids
  plan_done, plan_total = flow_step_ids.reduce([0.0, 0]) do |(done, total), step_id|
    step = self.steps[step_id]
    [done + (step.progress_done * step.progress_weight),
     total + step.progress_weight]
  end
  plan_total > 0 ? (plan_done / plan_total) : 1
end

#rescue_strategyObject



197
198
199
200
# File 'lib/dynflow/execution_plan.rb', line 197

def rescue_strategy
  rescue_strategy = entry_action.rescue_strategy || Action::Rescue::Skip
  Type! rescue_strategy, Action::Rescue::Strategy
end

#resultObject



166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/dynflow/execution_plan.rb', line 166

def result
  all_steps = steps.values
  if all_steps.any? { |step| step.state == :cancelled }
    return :cancelled
  elsif all_steps.any? { |step| step.state == :error }
    return :error
  elsif all_steps.any? { |step| [:skipping, :skipped].include?(step.state) }
    return :warning
  elsif all_steps.all? { |step| step.state == :success }
    return :success
  else
    return :pending
  end
end

#run_hooks(state) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/dynflow/execution_plan.rb', line 151

def run_hooks(state)
  records = persistence.load_actions_attributes(@id, [:id, :class]).select do |action|
    Utils.constantize(action[:class])
         .execution_plan_hooks
         .on(state).any?
  end
  action_ids = records.compact.map { |record| record[:id] }
  return if action_ids.empty?
  persistence.load_actions(self, action_ids).each do |action|
    world.middleware.execute(:hook, action, self) do
      action.class.execution_plan_hooks.run(self, action, state)
    end
  end
end

#run_stepsObject



228
229
230
# File 'lib/dynflow/execution_plan.rb', line 228

def run_steps
  steps_of_type(Dynflow::ExecutionPlan::Steps::RunStep)
end

#saveObject



449
450
451
# File 'lib/dynflow/execution_plan.rb', line 449

def save
  persistence.save_execution_plan(self)
end

#skip(step) ⇒ Object



332
333
334
335
336
# File 'lib/dynflow/execution_plan.rb', line 332

def skip(step)
  steps_to_skip = steps_to_skip(step).each(&:mark_to_skip)
  self.save
  return steps_to_skip
end

#steps_in_state(*states) ⇒ Object



240
241
242
# File 'lib/dynflow/execution_plan.rb', line 240

def steps_in_state(*states)
  self.steps.values.find_all {|step| states.include?(step.state) }
end

#steps_of_type(type) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



365
366
367
# File 'lib/dynflow/execution_plan.rb', line 365

def steps_of_type(type)
  steps.values.find_all { |step| step.is_a?(type) }
end

#steps_to_cancelObject



326
327
328
329
330
# File 'lib/dynflow/execution_plan.rb', line 326

def steps_to_cancel
  steps_in_state(:running, :suspended).find_all do |step|
    step.action(self).is_a?(::Dynflow::Action::Cancellable)
  end
end

#steps_to_skip(step) ⇒ Array<Steps::Abstract>

All the steps that need to get skipped when wanting to skip the step includes the step itself, all steps dependent on it (even transitively) FIND maybe move to persistence to let adapter to do it effectively?

Returns:



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/dynflow/execution_plan.rb', line 342

def steps_to_skip(step)
  dependent_steps = steps.values.find_all do |s|
    next if s.is_a? Steps::PlanStep
    action = persistence.load_action(s)
    action.required_step_ids.include?(step.id)
  end

  steps_to_skip = dependent_steps.map do |dependent_step|
    steps_to_skip(dependent_step)
  end.flatten

  steps_to_skip << step

  if step.is_a? Steps::RunStep
    finalize_step_id = persistence.load_action(step).finalize_step_id
    steps_to_skip << steps[finalize_step_id] if finalize_step_id
  end

  return steps_to_skip.uniq
end

#sub_plansObject



202
203
204
# File 'lib/dynflow/execution_plan.rb', line 202

def sub_plans
  persistence.find_execution_plans(filters: { 'caller_execution_plan_id' => self.id })
end

#sub_plans_countObject



206
207
208
# File 'lib/dynflow/execution_plan.rb', line 206

def sub_plans_count
  persistence.find_execution_plan_counts(filters: { 'caller_execution_plan_id' => self.id })
end

#switch_flow(new_flow, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Switches the flow type (Sequence, Concurrence) to be used within the block.



385
386
387
388
389
390
391
# File 'lib/dynflow/execution_plan.rb', line 385

def switch_flow(new_flow, &block)
  @run_flow_stack << new_flow
  return block.call
ensure
  @run_flow_stack.pop
  current_run_flow.add_and_resolve(@dependency_graph, new_flow) if current_run_flow
end

#to_hashObject



432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/dynflow/execution_plan.rb', line 432

def to_hash
  recursive_to_hash id:                id,
                    class:             self.class.to_s,
                    label:             label,
                    state:             state,
                    result:            result,
                    root_plan_step_id: root_plan_step && root_plan_step.id,
                    run_flow:          run_flow.encode,
                    finalize_flow:     finalize_flow.encode,
                    step_ids:          steps.map { |id, _| id },
                    started_at:        time_to_str(started_at),
                    ended_at:          time_to_str(ended_at),
                    execution_time:    execution_time,
                    real_time:         real_time,
                    execution_history: execution_history.to_hash
end

#update_state(state, history_notice: :auto) ⇒ Object

Parameters:

  • state (Symbol)

    representing the new state

  • history_notice (Symbol|string|false) (defaults to: :auto)

    should a note to execution_history be added as well? Possible values:

    - :auto (default) - the history notice will be added based on the new state
    - string - custom history notice is added
    - false - don't add any notice
    


120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/dynflow/execution_plan.rb', line 120

def update_state(state, history_notice: :auto)
  hooks_to_run = [state]
  original = self.state
  case self.state = state
  when :planning
    @started_at = Time.now.utc
  when :stopped
    @ended_at       = Time.now.utc
    @real_time      = @ended_at - @started_at unless @started_at.nil?
    @execution_time = compute_execution_time
    key = failure? ? :failure : :success
    Dynflow::Telemetry.with_instance do |t|
      t.increment_counter(:dynflow_finished_execution_plans, 1,
                          telemetry_common_options.merge(:result => key.to_s))
    end
    hooks_to_run << key
    unlock_all_singleton_locks!
  when :paused
    unlock_all_singleton_locks!
  else
    # ignore
  end
  logger.debug format('%13s %s    %9s >> %9s',
                      'ExecutionPlan', id, original, state)
  add_history_notice(history_notice)
  self.save
  toggle_telemetry_state original == :pending ? nil : original.to_s,
                         self.state == :stopped ? nil : self.state.to_s
  hooks_to_run.each { |kind| run_hooks kind }
end

#valid?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/dynflow/execution_plan.rb', line 106

def valid?
  true
end

#with_planning_scope(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



374
375
376
377
378
379
380
381
# File 'lib/dynflow/execution_plan.rb', line 374

def with_planning_scope(&block)
  @run_flow_stack   = []
  @dependency_graph = DependencyGraph.new
  switch_flow(run_flow, &block)
ensure
  @run_flow_stack   = nil
  @dependency_graph = nil
end