Class: Dynflow::Action

Inherits:
Serializable show all
Extended by:
Format
Includes:
Algebrick::Matching, Algebrick::TypeCheck, Progress, Rescue
Defined in:
lib/dynflow/action.rb,
lib/dynflow/action/v2.rb,
lib/dynflow/action/singleton.rb

Overview

rubocop:disable Metrics/ClassLength

Defined Under Namespace

Modules: Cancellable, Executable, Format, Phase, Polling, Progress, Rescue, Singleton, Timeouts, V2, WithBulkSubPlans, WithPollingSubPlans, WithSubPlans Classes: Missing, Suspended

Constant Summary collapse

OutputReference =
ExecutionPlan::OutputReference
ERROR =
Object.new
SUSPEND =
Object.new
Skip =
Algebrick.atom
DelayedEvent =
Algebrick.type do
  fields! execution_plan_id: String,
          step_id:           Integer,
          event:             Object,
          time:              type { variants Time, NilClass },
          optional:          Algebrick::Types::Boolean
end

Constants included from Rescue

Rescue::Strategy, Rescue::SuggestedStrategy

Constants inherited from Serializable

Serializable::LEGACY_TIME_FORMAT, Serializable::TIME_FORMAT

Instance Attribute Summary collapse

Attributes included from Progress

#calculated_progress

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Format

input_format, output_format

Methods included from Rescue

#combine_suggested_strategies, #rescue_strategy, #rescue_strategy_for_planned_action, #rescue_strategy_for_self

Methods included from Progress

#finalize_progress, #finalize_progress_weight, #run_progress, #run_progress_weight

Methods inherited from Serializable

from_hash

Constructor Details

#initialize(attributes, world) ⇒ Action

Returns a new instance of Action.

Raises:

  • (ArgumentError)


114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/dynflow/action.rb', line 114

def initialize(attributes, world)
  Type! attributes, Hash

  @phase             = Type! attributes.fetch(:phase), Phase
  @world             = Type! world, World
  @step              = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass
  raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil?
  @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String
  @id                = Type! attributes.fetch(:id), Integer
  @plan_step_id      = Type! attributes.fetch(:plan_step_id), Integer
  @run_step_id       = Type! attributes.fetch(:run_step_id), Integer, NilClass
  @finalize_step_id  = Type! attributes.fetch(:finalize_step_id), Integer, NilClass

  @execution_plan    = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present

  @caller_execution_plan_id  = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass)
  @caller_action_id          = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass)

  getter =-> key, required do
    required ? attributes.fetch(key) : attributes.fetch(key, {})
  end

  @input  = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present))
  @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present
  @pending_output_chunks = [] if phase? Run, Finalize
end

Instance Attribute Details

#caller_action_idObject (readonly)

Returns the value of attribute caller_action_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def caller_action_id
  @caller_action_id
end

#caller_execution_plan_idObject (readonly)

Returns the value of attribute caller_execution_plan_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def caller_execution_plan_id
  @caller_execution_plan_id
end

#execution_plan_idObject (readonly)

Returns the value of attribute execution_plan_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def execution_plan_id
  @execution_plan_id
end

#finalize_step_idObject (readonly)

Returns the value of attribute finalize_step_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def finalize_step_id
  @finalize_step_id
end

#idObject (readonly)

Returns the value of attribute id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def id
  @id
end

#inputObject

Returns the value of attribute input.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def input
  @input
end

#pending_output_chunksObject (readonly)

Returns the value of attribute pending_output_chunks.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def pending_output_chunks
  @pending_output_chunks
end

#phaseObject (readonly)

Returns the value of attribute phase.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def phase
  @phase
end

#plan_step_idObject (readonly)

Returns the value of attribute plan_step_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def plan_step_id
  @plan_step_id
end

#run_step_idObject (readonly)

Returns the value of attribute run_step_id.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def run_step_id
  @run_step_id
end

#worldObject (readonly)

Returns the value of attribute world.



107
108
109
# File 'lib/dynflow/action.rb', line 107

def world
  @world
end

Class Method Details

.all_childrenObject



31
32
33
34
35
# File 'lib/dynflow/action.rb', line 31

def self.all_children
  children.values.inject(children.values) do |children, child|
    children + child.all_children
  end
end

.childrenObject



43
44
45
# File 'lib/dynflow/action.rb', line 43

def self.children
  @children ||= {}
end

.constantize(action_name) ⇒ Object



101
102
103
104
105
# File 'lib/dynflow/action.rb', line 101

def self.constantize(action_name)
  super action_name
rescue NameError
  Action::Missing.generate(action_name)
end

.execution_plan_hooksObject



51
52
53
# File 'lib/dynflow/action.rb', line 51

def self.execution_plan_hooks
  @execution_plan_hooks ||= ExecutionPlan::Hooks::Register.new
end

.inherit_execution_plan_hooks(hooks) ⇒ Object



55
56
57
# File 'lib/dynflow/action.rb', line 55

def self.inherit_execution_plan_hooks(hooks)
  @execution_plan_hooks = hooks
end

.inherited(child) ⇒ Object



37
38
39
40
41
# File 'lib/dynflow/action.rb', line 37

def self.inherited(child)
  children[child.name] = child
  child.inherit_execution_plan_hooks(execution_plan_hooks.dup)
  super child
end

.middlewareObject



47
48
49
# File 'lib/dynflow/action.rb', line 47

def self.middleware
  @middleware ||= Middleware::Register.new
end

.subscribenil, Class

FIND define subscriptions in world independent on action’s classes,

limited only by in/output formats


62
63
64
# File 'lib/dynflow/action.rb', line 62

def self.subscribe
  nil
end

Instance Method Details

#action_loggerObject



225
226
227
# File 'lib/dynflow/action.rb', line 225

def action_logger
  world.action_logger
end

#all_planned_actions(filter_class = Action) ⇒ Array<Action>

returned actions are in Present phase



248
249
250
251
252
253
# File 'lib/dynflow/action.rb', line 248

def all_planned_actions(filter_class = Action)
  phase! Present
  mine = planned_actions
  (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions }).
      select { |a| a.is_a?(filter_class) }
end

#caller_actionObject



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/dynflow/action.rb', line 189

def caller_action
  phase! Present
  return nil if @caller_action_id
  return @caller_action if @caller_action

  caller_execution_plan = if @caller_execution_plan_id.nil?
                            execution_plan
                          else
                            world.persistence.load_execution_plan(@caller_execution_plan_id)
                          end
  @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id)
end

#delayed_eventsObject



358
359
360
# File 'lib/dynflow/action.rb', line 358

def delayed_events
  @delayed_events ||= []
end

#drop_output_chunks!Object



183
184
185
186
187
# File 'lib/dynflow/action.rb', line 183

def drop_output_chunks!
  @pending_output_chunks = []
  @output_chunks = []
  world.persistence.delete_output_chunks(@execution_plan_id, @id)
end

#errorObject



296
297
298
299
# File 'lib/dynflow/action.rb', line 296

def error
  raise "error data not available" if @step.nil?
  @step.error
end

#execute(*args) ⇒ Object



301
302
303
304
# File 'lib/dynflow/action.rb', line 301

def execute(*args)
  phase! Executable
  self.send phase.execute_method_name, *args
end

#execute_delay(delay_options, *args) ⇒ Object



326
327
328
329
330
331
332
333
334
# File 'lib/dynflow/action.rb', line 326

def execute_delay(delay_options, *args)
  with_error_handling(true) do
    world.middleware.execute(:delay, self, delay_options, *args) do |*new_args|
      @serializer = delay(*new_args).tap do |serializer|
        serializer.perform_serialization!
      end
    end
  end
end

#execution_planObject



220
221
222
223
# File 'lib/dynflow/action.rb', line 220

def execution_plan
  phase! Plan, Present
  @execution_plan
end

#finalize_stepObject



260
261
262
263
# File 'lib/dynflow/action.rb', line 260

def finalize_step
  phase! Present
  execution_plan.steps.fetch(finalize_step_id) if finalize_step_id
end

#from_subscription?Boolean



215
216
217
218
# File 'lib/dynflow/action.rb', line 215

def from_subscription?
  phase! Plan
  @from_subscription
end

#holds_singleton_lock?Boolean



341
342
343
# File 'lib/dynflow/action.rb', line 341

def holds_singleton_lock?
  false
end

#humanized_stateObject

action: used in Dynflow console



292
293
294
# File 'lib/dynflow/action.rb', line 292

def humanized_state
  state.to_s
end

#labelObject



150
151
152
# File 'lib/dynflow/action.rb', line 150

def label
  self.class.name
end

#outputObject



166
167
168
169
170
171
172
173
# File 'lib/dynflow/action.rb', line 166

def output
  if phase? Plan
    @output_reference or
      raise 'plan_self has to be invoked before being able to reference the output'
  else
    @output
  end
end

#output=(hash) ⇒ Object



160
161
162
163
164
# File 'lib/dynflow/action.rb', line 160

def output=(hash)
  Type! hash, Hash
  phase! Run
  @output = Utils.indifferent_hash(hash)
end

#output_chunk(chunk, kind: nil, timestamp: Time.now) ⇒ Object



175
176
177
# File 'lib/dynflow/action.rb', line 175

def output_chunk(chunk, kind: nil, timestamp: Time.now)
  @pending_output_chunks << { chunk: chunk, kind: kind, timestamp: timestamp }
end

#phase!(*phases) ⇒ Object



145
146
147
148
# File 'lib/dynflow/action.rb', line 145

def phase!(*phases)
  phase?(*phases) or
    raise TypeError, "Wrong phase #{phase}, required #{phases}"
end

#phase?(*phases) ⇒ Boolean



141
142
143
# File 'lib/dynflow/action.rb', line 141

def phase?(*phases)
  Match? phase, *phases
end

#plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) ⇒ Object

Plan an event to be send to the action defined by action, what defaults to be self. if time is not passed, event is sent as soon as possible.



353
354
355
356
# File 'lib/dynflow/action.rb', line 353

def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false)
  time = @world.clock.current_time + time if time.is_a?(Numeric)
  delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional]
end

#plan_stepObject



229
230
231
232
# File 'lib/dynflow/action.rb', line 229

def plan_step
  phase! Present
  execution_plan.steps.fetch(plan_step_id)
end

#planned_actions(filter = Action) ⇒ Array<Action>

returned actions are in Present phase



237
238
239
240
241
242
243
# File 'lib/dynflow/action.rb', line 237

def planned_actions(filter = Action)
  phase! Present
  plan_step.
      planned_steps(execution_plan).
      map { |s| s.action(execution_plan) }.
      select { |a| a.is_a?(filter) }
end

#queueObject

queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action



348
349
# File 'lib/dynflow/action.rb', line 348

def queue
end

#required_step_ids(input = self.input) ⇒ Array<Integer>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns - ids of steps referenced from action.



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/dynflow/action.rb', line 308

def required_step_ids(input = self.input)
  results   = []
  recursion =-> value do
    case value
    when Hash
      value.values.each { |v| recursion.(v) }
    when Array
      value.each { |v| recursion.(v) }
    when ExecutionPlan::OutputReference
      results << value.step_id
    else
      # no reference hidden in this arg
    end
    results
  end
  recursion.(input)
end

#run_stepObject



255
256
257
258
# File 'lib/dynflow/action.rb', line 255

def run_step
  phase! Present
  execution_plan.steps.fetch(run_step_id) if run_step_id
end

#serializerObject



336
337
338
339
# File 'lib/dynflow/action.rb', line 336

def serializer
  raise "The action must be delayed in order to access the serializer" if @serializer.nil?
  @serializer
end

#set_plan_context(execution_plan, triggering_action, from_subscription) ⇒ Object



202
203
204
205
206
207
# File 'lib/dynflow/action.rb', line 202

def set_plan_context(execution_plan, triggering_action, from_subscription)
  phase! Plan
  @execution_plan    = Type! execution_plan, ExecutionPlan
  @triggering_action = Type! triggering_action, Action, NilClass
  @from_subscription = Type! from_subscription, TrueClass, FalseClass
end

#stateObject



285
286
287
288
# File 'lib/dynflow/action.rb', line 285

def state
  raise "state data not available" if @step.nil?
  @step.state
end

#stepsObject



265
266
267
# File 'lib/dynflow/action.rb', line 265

def steps
  [plan_step, run_step, finalize_step]
end

#stored_output_chunksObject



179
180
181
# File 'lib/dynflow/action.rb', line 179

def stored_output_chunks
  @output_chunks ||= world.persistence.load_output_chunks(@execution_plan_id, @id)
end

#to_hashObject



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/dynflow/action.rb', line 269

def to_hash
  recursive_to_hash(
      { class:                     self.class.name,
        execution_plan_id:         execution_plan_id,
        id:                        id,
        plan_step_id:              plan_step_id,
        run_step_id:               run_step_id,
        finalize_step_id:          finalize_step_id,
        caller_execution_plan_id:  caller_execution_plan_id,
        caller_action_id:          caller_action_id,
        input:                     input },
      if phase? Run, Finalize, Present
        { output: output }
      end)
end

#triggering_actionObject

action that caused this action to be planned. Available only in planning phase



210
211
212
213
# File 'lib/dynflow/action.rb', line 210

def triggering_action
  phase! Plan
  @triggering_action
end