Class: Dynflow::ExecutionPlan

Inherits:
Serializable show all
Includes:
Algebrick::TypeCheck, Stateful
Defined in:
lib/dynflow/execution_plan.rb

Overview

TODO extract planning logic to an extra class ExecutionPlanner

Defined Under Namespace

Modules: Steps Classes: DependencyGraph, OutputReference

Instance Attribute Summary collapse

Attributes included from Stateful

#state

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Stateful

included, #set_state, #state_transitions, #states

Methods inherited from Serializable

constantize, from_hash

Constructor Details

#initialize(world, id = SecureRandom.uuid, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0) ⇒ ExecutionPlan

all params with default values are part of private api



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/dynflow/execution_plan.rb', line 32

def initialize(world,
    id = SecureRandom.uuid,
    state = :pending,
    root_plan_step = nil,
    run_flow = Flows::Concurrence.new([]),
    finalize_flow = Flows::Sequence.new([]),
    steps = {},
    started_at = nil,
    ended_at = nil,
    execution_time = nil,
    real_time = 0.0)

  @id             = Type! id, String
  @world          = Type! world, World
  self.state      = state
  @run_flow       = Type! run_flow, Flows::Abstract
  @finalize_flow  = Type! finalize_flow, Flows::Abstract
  @root_plan_step = root_plan_step
  @started_at     = Type! started_at, Time, NilClass
  @ended_at       = Type! ended_at, Time, NilClass
  @execution_time = Type! execution_time, Numeric, NilClass
  @real_time      = Type! real_time, Numeric

  steps.all? do |k, v|
    Type! k, Integer
    Type! v, Steps::Abstract
  end
  @steps = steps
end

Instance Attribute Details

#ended_atObject (readonly)

Returns the value of attribute ended_at.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def ended_at
  @ended_at
end

#execution_timeObject (readonly)

Returns the value of attribute execution_time.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def execution_time
  @execution_time
end

#finalize_flowObject (readonly)

Returns the value of attribute finalize_flow.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def finalize_flow
  @finalize_flow
end

#idObject (readonly)

Returns the value of attribute id.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def id
  @id
end

#real_timeObject (readonly)

Returns the value of attribute real_time.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def real_time
  @real_time
end

#root_plan_stepObject (readonly)

Returns the value of attribute root_plan_step.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def root_plan_step
  @root_plan_step
end

#run_flowObject (readonly)

Returns the value of attribute run_flow.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def run_flow
  @run_flow
end

#started_atObject (readonly)

Returns the value of attribute started_at.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def started_at
  @started_at
end

#stepsObject (readonly)

Returns the value of attribute steps.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def steps
  @steps
end

#worldObject (readonly)

Returns the value of attribute world.



15
16
17
# File 'lib/dynflow/execution_plan.rb', line 15

def world
  @world
end

Class Method Details

.new_from_hash(hash, world) ⇒ Object



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/dynflow/execution_plan.rb', line 269

def self.new_from_hash(hash, world)
  check_class_matching hash
  execution_plan_id = hash[:id]
  steps             = steps_from_hash(hash[:step_ids], execution_plan_id, world)
  self.new(world,
           execution_plan_id,
           hash[:state],
           steps[hash[:root_plan_step_id]],
           Flows::Abstract.from_hash(hash[:run_flow]),
           Flows::Abstract.from_hash(hash[:finalize_flow]),
           steps,
           string_to_time(hash[:started_at]),
           string_to_time(hash[:ended_at]),
           hash[:execution_time].to_f,
           hash[:real_time].to_f)
end

.state_transitionsObject



22
23
24
25
26
27
28
29
# File 'lib/dynflow/execution_plan.rb', line 22

def self.state_transitions
  @state_transitions ||= { pending:  [:planning],
                           planning: [:planned, :stopped],
                           planned:  [:running],
                           running:  [:paused, :stopped],
                           paused:   [:running],
                           stopped:  [] }
end

.statesObject



18
19
20
# File 'lib/dynflow/execution_plan.rb', line 18

def self.states
  @states ||= [:pending, :planning, :planned, :running, :paused, :stopped]
end

Instance Method Details

#actionsArray<Action>

Returns actions in Present phase.

Returns:

  • (Array<Action>)

    actions in Present phase



309
310
311
312
313
# File 'lib/dynflow/execution_plan.rb', line 309

def actions
  @actions ||= begin
    [entry_action] + entry_action.all_planned_actions
  end
end

#add_finalize_step(action) ⇒ Object



243
244
245
246
247
248
# File 'lib/dynflow/execution_plan.rb', line 243

def add_finalize_step(action)
  add_step(Steps::FinalizeStep, action.class, action.id).tap do |step|
    step.progress_weight = action.finalize_progress_weight
    finalize_flow << Flows::Atom.new(step.id)
  end
end

#add_plan_step(action_class, planned_by = nil) ⇒ Object



229
230
231
232
233
# File 'lib/dynflow/execution_plan.rb', line 229

def add_plan_step(action_class, planned_by = nil)
  add_step(Steps::PlanStep, action_class, generate_action_id, planned_by && planned_by.plan_step_id).tap do |step|
    step.initialize_action
  end
end

#add_run_step(action) ⇒ Object



235
236
237
238
239
240
241
# File 'lib/dynflow/execution_plan.rb', line 235

def add_run_step(action)
  add_step(Steps::RunStep, action.class, action.id).tap do |step|
    step.progress_weight = action.run_progress_weight
    @dependency_graph.add_dependencies(step, action)
    current_run_flow.add_and_resolve(@dependency_graph, Flows::Atom.new(step.id))
  end
end

#compute_execution_timeObject



286
287
288
289
290
# File 'lib/dynflow/execution_plan.rb', line 286

def compute_execution_time
  self.steps.values.reduce(0) do |execution_time, step|
    execution_time + (step.execution_time || 0)
  end
end

#current_run_flowObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



205
206
207
# File 'lib/dynflow/execution_plan.rb', line 205

def current_run_flow
  @run_flow_stack.last
end

#entry_actionObject



304
305
306
# File 'lib/dynflow/execution_plan.rb', line 304

def entry_action
  @entry_action ||= root_plan_step.action(self)
end

#error?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/dynflow/execution_plan.rb', line 96

def error?
  result == :error
end

#errorsObject



100
101
102
# File 'lib/dynflow/execution_plan.rb', line 100

def errors
  steps.values.map(&:error).compact
end

#failed_stepsObject



118
119
120
# File 'lib/dynflow/execution_plan.rb', line 118

def failed_steps
  steps_in_state(:error)
end

#generate_action_idObject



134
135
136
137
# File 'lib/dynflow/execution_plan.rb', line 134

def generate_action_id
  @last_action_id ||= 0
  @last_action_id += 1
end

#generate_step_idObject



139
140
141
142
# File 'lib/dynflow/execution_plan.rb', line 139

def generate_step_id
  @last_step_id ||= 0
  @last_step_id += 1
end

#loggerObject



62
63
64
# File 'lib/dynflow/execution_plan.rb', line 62

def logger
  @world.logger
end

#plan(*args) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/dynflow/execution_plan.rb', line 150

def plan(*args)
  update_state(:planning)
  world.transaction_adapter.transaction do
    world.middleware.execute(:plan_phase, root_plan_step.action_class) do
      with_planning_scope do
        root_plan_step.execute(self, nil, false, *args)

        if @dependency_graph.unresolved?
          raise "Some dependencies were not resolved: #{@dependency_graph.inspect}"
        end
      end
    end

    if @run_flow.size == 1
      @run_flow = @run_flow.sub_flows.first
    end

    world.transaction_adapter.rollback if error?
  end
  steps.values.each(&:save)
  update_state(error? ? :stopped : :planned)
end

#prepare(action_class) ⇒ Object



144
145
146
147
148
# File 'lib/dynflow/execution_plan.rb', line 144

def prepare(action_class)
  save
  @root_plan_step = add_plan_step(action_class)
  @root_plan_step.save
end

#progress0..1

info

Returns:

  • (0..1)

    the percentage of the progress. See Action::Progress for more



294
295
296
297
298
299
300
301
302
# File 'lib/dynflow/execution_plan.rb', line 294

def progress
  flow_step_ids         = run_flow.all_step_ids + finalize_flow.all_step_ids
  plan_done, plan_total = flow_step_ids.reduce([0.0, 0]) do |(done, total), step_id|
    step = self.steps[step_id]
    [done + (step.progress_done * step.progress_weight),
     total + step.progress_weight]
  end
  plan_total > 0 ? (plan_done / plan_total) : 1
end

#rescue_from_errorObject



126
127
128
129
130
131
132
# File 'lib/dynflow/execution_plan.rb', line 126

def rescue_from_error
  if rescue_plan_id = self.rescue_plan_id
    @world.execute(rescue_plan_id)
  else
    raise Errors::RescueError, 'Unable to rescue from the error'
  end
end

#rescue_plan_idObject



108
109
110
111
112
113
114
115
116
# File 'lib/dynflow/execution_plan.rb', line 108

def rescue_plan_id
  case rescue_strategy
  when Action::Rescue::Pause
    nil
  when Action::Rescue::Skip
    failed_steps.each { |step| self.skip(step) }
    self.id
  end
end

#rescue_strategyObject



104
105
106
# File 'lib/dynflow/execution_plan.rb', line 104

def rescue_strategy
  Type! entry_action.rescue_strategy, Action::Rescue::Strategy
end

#resultObject



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/dynflow/execution_plan.rb', line 83

def result
  all_steps = steps.values
  if all_steps.any? { |step| step.state == :error }
    return :error
  elsif all_steps.any? { |step| [:skipping, :skipped].include?(step.state) }
    return :warning
  elsif all_steps.all? { |step| step.state == :success }
    return :success
  else
    return :pending
  end
end

#saveObject



265
266
267
# File 'lib/dynflow/execution_plan.rb', line 265

def save
  persistence.save_execution_plan(self)
end

#skip(step) ⇒ Object



173
174
175
176
177
# File 'lib/dynflow/execution_plan.rb', line 173

def skip(step)
  steps_to_skip = steps_to_skip(step).each(&:mark_to_skip)
  self.save
  return steps_to_skip
end

#steps_in_state(*states) ⇒ Object



122
123
124
# File 'lib/dynflow/execution_plan.rb', line 122

def steps_in_state(*states)
  self.steps.values.find_all {|step| states.include?(step.state) }
end

#steps_to_skip(step) ⇒ Array<Steps::Abstract>

All the steps that need to get skipped when wanting to skip the step includes the step itself, all steps dependent on it (even transitively) FIND maybe move to persistence to let adapter to do it effectively?

Returns:



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/dynflow/execution_plan.rb', line 183

def steps_to_skip(step)
  dependent_steps = steps.values.find_all do |s|
    next if s.is_a? Steps::PlanStep
    action = persistence.load_action(s)
    action.required_step_ids.include?(step.id)
  end

  steps_to_skip = dependent_steps.map do |dependent_step|
    steps_to_skip(dependent_step)
  end.flatten

  steps_to_skip << step

  if step.is_a? Steps::RunStep
    finalize_step_id = persistence.load_action(step).finalize_step_id
    steps_to_skip << steps[finalize_step_id] if finalize_step_id
  end

  return steps_to_skip.uniq
end

#switch_flow(new_flow, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Switches the flow type (Sequence, Concurrence) to be used within the block.



221
222
223
224
225
226
227
# File 'lib/dynflow/execution_plan.rb', line 221

def switch_flow(new_flow, &block)
  @run_flow_stack << new_flow
  return block.call
ensure
  @run_flow_stack.pop
  current_run_flow.add_and_resolve(@dependency_graph, new_flow) if current_run_flow
end

#to_hashObject



250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/dynflow/execution_plan.rb', line 250

def to_hash
  recursive_to_hash id:                self.id,
                    class:             self.class.to_s,
                    state:             self.state,
                    result:            result,
                    root_plan_step_id: root_plan_step && root_plan_step.id,
                    run_flow:          run_flow,
                    finalize_flow:     finalize_flow,
                    step_ids:          steps.map { |id, _| id },
                    started_at:        time_to_str(started_at),
                    ended_at:          time_to_str(ended_at),
                    execution_time:    execution_time,
                    real_time:         real_time
end

#update_state(state) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/dynflow/execution_plan.rb', line 66

def update_state(state)
  original = self.state
  case self.state = state
  when :planning
    @started_at = Time.now
  when :stopped
    @ended_at       = Time.now
    @real_time      = @ended_at - @started_at
    @execution_time = compute_execution_time
  else
    # ignore
  end
  logger.debug format('%13s %s    %9s >> %9s',
                      'ExecutionPlan', id, original, state)
  self.save
end

#with_planning_scope(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



210
211
212
213
214
215
216
217
# File 'lib/dynflow/execution_plan.rb', line 210

def with_planning_scope(&block)
  @run_flow_stack   = []
  @dependency_graph = DependencyGraph.new
  switch_flow(run_flow, &block)
ensure
  @run_flow_stack   = nil
  @dependency_graph = nil
end