Class: Dynflow::Action
- Inherits:
-
Serializable
show all
- Extended by:
- Format
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck, Progress, Rescue
- Defined in:
- lib/dynflow/action.rb,
lib/dynflow/action/v2.rb,
lib/dynflow/action/singleton.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Modules: Cancellable, Executable, Format, Phase, Polling, Progress, Rescue, Singleton, Timeouts, V2, WithBulkSubPlans, WithPollingSubPlans, WithSubPlans
Classes: Missing, Suspended
Constant Summary
collapse
- OutputReference =
ExecutionPlan::OutputReference
- ERROR =
Object.new
- SUSPEND =
Object.new
- Skip =
Algebrick.atom
- DelayedEvent =
Algebrick.type do
fields! execution_plan_id: String,
step_id: Integer,
event: Object,
time: type { variants Time, NilClass },
optional: Algebrick::Types::Boolean
end
Constants included
from Rescue
Rescue::Strategy, Rescue::SuggestedStrategy
Constants inherited
from Serializable
Serializable::LEGACY_TIME_FORMAT, Serializable::TIME_FORMAT
Instance Attribute Summary collapse
Attributes included from Progress
#calculated_progress
Class Method Summary
collapse
Instance Method Summary
collapse
-
#action_logger ⇒ Object
-
#all_planned_actions(filter_class = Action) ⇒ Array<Action>
returned actions are in Present phase.
-
#caller_action ⇒ Object
-
#delayed_events ⇒ Object
-
#drop_output_chunks! ⇒ Object
-
#error ⇒ Object
-
#execute(*args) ⇒ Object
-
#execute_delay(delay_options, *args) ⇒ Object
-
#execution_plan ⇒ Object
-
#finalize_step ⇒ Object
-
#from_subscription? ⇒ Boolean
-
#holds_singleton_lock? ⇒ Boolean
-
#humanized_state ⇒ Object
action: used in Dynflow console.
-
#initialize(attributes, world) ⇒ Action
constructor
A new instance of Action.
-
#label ⇒ Object
-
#output ⇒ Object
-
#output=(hash) ⇒ Object
-
#output_chunk(chunk, kind: nil, timestamp: Time.now) ⇒ Object
-
#phase!(*phases) ⇒ Object
-
#phase?(*phases) ⇒ Boolean
-
#plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) ⇒ Object
Plan an event to be send to the action defined by action, what defaults to be self.
-
#plan_step ⇒ Object
-
#planned_actions(filter = Action) ⇒ Array<Action>
returned actions are in Present phase.
-
#queue ⇒ Object
queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action.
-
#required_step_ids(input = self.input) ⇒ Array<Integer>
private
-
#run_step ⇒ Object
-
#serializer ⇒ Object
-
#set_plan_context(execution_plan, triggering_action, from_subscription) ⇒ Object
-
#state ⇒ Object
-
#steps ⇒ Object
-
#stored_output_chunks ⇒ Object
-
#to_hash ⇒ Object
-
#triggering_action ⇒ Object
action that caused this action to be planned.
Methods included from Format
input_format, output_format
Methods included from Rescue
#combine_suggested_strategies, #rescue_strategy, #rescue_strategy_for_planned_action, #rescue_strategy_for_self
Methods included from Progress
#finalize_progress, #finalize_progress_weight, #run_progress, #run_progress_weight
from_hash
Constructor Details
#initialize(attributes, world) ⇒ Action
Returns a new instance of Action.
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/dynflow/action.rb', line 114
def initialize(attributes, world)
Type! attributes, Hash
@phase = Type! attributes.fetch(:phase), Phase
@world = Type! world, World
@step = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass
raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil?
@execution_plan_id = Type! attributes.fetch(:execution_plan_id), String
@id = Type! attributes.fetch(:id), Integer
@plan_step_id = Type! attributes.fetch(:plan_step_id), Integer
@run_step_id = Type! attributes.fetch(:run_step_id), Integer, NilClass
@finalize_step_id = Type! attributes.fetch(:finalize_step_id), Integer, NilClass
@execution_plan = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present
@caller_execution_plan_id = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass)
@caller_action_id = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass)
getter = ->key, required do
required ? attributes.fetch(key) : attributes.fetch(key, {})
end
@input = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present))
@output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present
@pending_output_chunks = [] if phase? Run, Finalize
end
|
Instance Attribute Details
#caller_action_id ⇒ Object
Returns the value of attribute caller_action_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def caller_action_id
@caller_action_id
end
|
#caller_execution_plan_id ⇒ Object
Returns the value of attribute caller_execution_plan_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def caller_execution_plan_id
@caller_execution_plan_id
end
|
#execution_plan_id ⇒ Object
Returns the value of attribute execution_plan_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def execution_plan_id
@execution_plan_id
end
|
#finalize_step_id ⇒ Object
Returns the value of attribute finalize_step_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def finalize_step_id
@finalize_step_id
end
|
#id ⇒ Object
Returns the value of attribute id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def id
@id
end
|
Returns the value of attribute input.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def input
@input
end
|
#pending_output_chunks ⇒ Object
Returns the value of attribute pending_output_chunks.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def pending_output_chunks
@pending_output_chunks
end
|
#phase ⇒ Object
Returns the value of attribute phase.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def phase
@phase
end
|
#plan_step_id ⇒ Object
Returns the value of attribute plan_step_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def plan_step_id
@plan_step_id
end
|
#run_step_id ⇒ Object
Returns the value of attribute run_step_id.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def run_step_id
@run_step_id
end
|
#world ⇒ Object
Returns the value of attribute world.
107
108
109
|
# File 'lib/dynflow/action.rb', line 107
def world
@world
end
|
Class Method Details
.all_children ⇒ Object
31
32
33
34
35
|
# File 'lib/dynflow/action.rb', line 31
def self.all_children
children.values.inject(children.values) do |children, child|
children + child.all_children
end
end
|
.children ⇒ Object
43
44
45
|
# File 'lib/dynflow/action.rb', line 43
def self.children
@children ||= {}
end
|
.constantize(action_name) ⇒ Object
101
102
103
104
105
|
# File 'lib/dynflow/action.rb', line 101
def self.constantize(action_name)
super action_name
rescue NameError
Action::Missing.generate(action_name)
end
|
.execution_plan_hooks ⇒ Object
51
52
53
|
# File 'lib/dynflow/action.rb', line 51
def self.execution_plan_hooks
@execution_plan_hooks ||= ExecutionPlan::Hooks::Register.new
end
|
.inherit_execution_plan_hooks(hooks) ⇒ Object
55
56
57
|
# File 'lib/dynflow/action.rb', line 55
def self.inherit_execution_plan_hooks(hooks)
@execution_plan_hooks = hooks
end
|
.inherited(child) ⇒ Object
37
38
39
40
41
|
# File 'lib/dynflow/action.rb', line 37
def self.inherited(child)
children[child.name] = child
child.inherit_execution_plan_hooks(execution_plan_hooks.dup)
super child
end
|
.middleware ⇒ Object
47
48
49
|
# File 'lib/dynflow/action.rb', line 47
def self.middleware
@middleware ||= Middleware::Register.new
end
|
.subscribe ⇒ nil, Class
FIND define subscriptions in world independent on action’s classes,
limited only by in/output formats
62
63
64
|
# File 'lib/dynflow/action.rb', line 62
def self.subscribe
nil
end
|
Instance Method Details
#action_logger ⇒ Object
225
226
227
|
# File 'lib/dynflow/action.rb', line 225
def action_logger
world.action_logger
end
|
#all_planned_actions(filter_class = Action) ⇒ Array<Action>
returned actions are in Present phase
248
249
250
251
252
253
|
# File 'lib/dynflow/action.rb', line 248
def all_planned_actions(filter_class = Action)
phase! Present
mine = planned_actions
(mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions })
.select { |a| a.is_a?(filter_class) }
end
|
#caller_action ⇒ Object
189
190
191
192
193
194
195
196
197
198
199
200
|
# File 'lib/dynflow/action.rb', line 189
def caller_action
phase! Present
return nil if @caller_action_id
return @caller_action if @caller_action
caller_execution_plan = if @caller_execution_plan_id.nil?
execution_plan
else
world.persistence.load_execution_plan(@caller_execution_plan_id)
end
@caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id)
end
|
#delayed_events ⇒ Object
359
360
361
|
# File 'lib/dynflow/action.rb', line 359
def delayed_events
@delayed_events ||= []
end
|
#drop_output_chunks! ⇒ Object
183
184
185
186
187
|
# File 'lib/dynflow/action.rb', line 183
def drop_output_chunks!
@pending_output_chunks = []
@output_chunks = []
world.persistence.delete_output_chunks(@execution_plan_id, @id)
end
|
#error ⇒ Object
297
298
299
300
|
# File 'lib/dynflow/action.rb', line 297
def error
raise "error data not available" if @step.nil?
@step.error
end
|
#execute(*args) ⇒ Object
302
303
304
305
|
# File 'lib/dynflow/action.rb', line 302
def execute(*args)
phase! Executable
self.send phase.execute_method_name, *args
end
|
#execute_delay(delay_options, *args) ⇒ Object
327
328
329
330
331
332
333
334
335
|
# File 'lib/dynflow/action.rb', line 327
def execute_delay(delay_options, *args)
with_error_handling(true) do
world.middleware.execute(:delay, self, delay_options, *args) do |*new_args|
@serializer = delay(*new_args).tap do |serializer|
serializer.perform_serialization!
end
end
end
end
|
#execution_plan ⇒ Object
220
221
222
223
|
# File 'lib/dynflow/action.rb', line 220
def execution_plan
phase! Plan, Present
@execution_plan
end
|
#finalize_step ⇒ Object
260
261
262
263
|
# File 'lib/dynflow/action.rb', line 260
def finalize_step
phase! Present
execution_plan.steps.fetch(finalize_step_id) if finalize_step_id
end
|
#from_subscription? ⇒ Boolean
215
216
217
218
|
# File 'lib/dynflow/action.rb', line 215
def from_subscription?
phase! Plan
@from_subscription
end
|
#holds_singleton_lock? ⇒ Boolean
342
343
344
|
# File 'lib/dynflow/action.rb', line 342
def holds_singleton_lock?
false
end
|
#humanized_state ⇒ Object
action: used in Dynflow console
293
294
295
|
# File 'lib/dynflow/action.rb', line 293
def humanized_state
state.to_s
end
|
#label ⇒ Object
150
151
152
|
# File 'lib/dynflow/action.rb', line 150
def label
self.class.name
end
|
#output ⇒ Object
166
167
168
169
170
171
172
173
|
# File 'lib/dynflow/action.rb', line 166
def output
if phase? Plan
@output_reference or
raise 'plan_self has to be invoked before being able to reference the output'
else
@output
end
end
|
#output=(hash) ⇒ Object
160
161
162
163
164
|
# File 'lib/dynflow/action.rb', line 160
def output=(hash)
Type! hash, Hash
phase! Run
@output = Utils.indifferent_hash(hash)
end
|
#output_chunk(chunk, kind: nil, timestamp: Time.now) ⇒ Object
175
176
177
|
# File 'lib/dynflow/action.rb', line 175
def output_chunk(chunk, kind: nil, timestamp: Time.now)
@pending_output_chunks << { chunk: chunk, kind: kind, timestamp: timestamp }
end
|
#phase!(*phases) ⇒ Object
145
146
147
148
|
# File 'lib/dynflow/action.rb', line 145
def phase!(*phases)
phase?(*phases) or
raise TypeError, "Wrong phase #{phase}, required #{phases}"
end
|
#phase?(*phases) ⇒ Boolean
141
142
143
|
# File 'lib/dynflow/action.rb', line 141
def phase?(*phases)
Match? phase, *phases
end
|
#plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) ⇒ Object
Plan an event to be send to the action defined by action, what defaults to be self. if time is not passed, event is sent as soon as possible.
354
355
356
357
|
# File 'lib/dynflow/action.rb', line 354
def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false)
time = @world.clock.current_time + time if time.is_a?(Numeric)
delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional]
end
|
#plan_step ⇒ Object
229
230
231
232
|
# File 'lib/dynflow/action.rb', line 229
def plan_step
phase! Present
execution_plan.steps.fetch(plan_step_id)
end
|
#planned_actions(filter = Action) ⇒ Array<Action>
returned actions are in Present phase
237
238
239
240
241
242
243
|
# File 'lib/dynflow/action.rb', line 237
def planned_actions(filter = Action)
phase! Present
plan_step
.planned_steps(execution_plan)
.map { |s| s.action(execution_plan) }
.select { |a| a.is_a?(filter) }
end
|
#queue ⇒ Object
queue defined here will also be used as the default queue for all the steps planned under this action, unless overrided by sub-action
349
350
|
# File 'lib/dynflow/action.rb', line 349
def queue
end
|
#required_step_ids(input = self.input) ⇒ Array<Integer>
This method is part of a private API.
You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns - ids of steps referenced from action.
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
|
# File 'lib/dynflow/action.rb', line 309
def required_step_ids(input = self.input)
results = []
recursion = ->value do
case value
when Hash
value.values.each { |v| recursion.(v) }
when Array
value.each { |v| recursion.(v) }
when ExecutionPlan::OutputReference
results << value.step_id
else
end
results
end
recursion.(input)
end
|
#run_step ⇒ Object
255
256
257
258
|
# File 'lib/dynflow/action.rb', line 255
def run_step
phase! Present
execution_plan.steps.fetch(run_step_id) if run_step_id
end
|
#serializer ⇒ Object
337
338
339
340
|
# File 'lib/dynflow/action.rb', line 337
def serializer
raise "The action must be delayed in order to access the serializer" if @serializer.nil?
@serializer
end
|
#set_plan_context(execution_plan, triggering_action, from_subscription) ⇒ Object
202
203
204
205
206
207
|
# File 'lib/dynflow/action.rb', line 202
def set_plan_context(execution_plan, triggering_action, from_subscription)
phase! Plan
@execution_plan = Type! execution_plan, ExecutionPlan
@triggering_action = Type! triggering_action, Action, NilClass
@from_subscription = Type! from_subscription, TrueClass, FalseClass
end
|
#state ⇒ Object
286
287
288
289
|
# File 'lib/dynflow/action.rb', line 286
def state
raise "state data not available" if @step.nil?
@step.state
end
|
#steps ⇒ Object
265
266
267
|
# File 'lib/dynflow/action.rb', line 265
def steps
[plan_step, run_step, finalize_step]
end
|
#stored_output_chunks ⇒ Object
179
180
181
|
# File 'lib/dynflow/action.rb', line 179
def stored_output_chunks
@output_chunks ||= world.persistence.load_output_chunks(@execution_plan_id, @id)
end
|
#to_hash ⇒ Object
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
# File 'lib/dynflow/action.rb', line 269
def to_hash
recursive_to_hash(
{ class: self.class.name,
execution_plan_id: execution_plan_id,
id: id,
plan_step_id: plan_step_id,
run_step_id: run_step_id,
finalize_step_id: finalize_step_id,
caller_execution_plan_id: caller_execution_plan_id,
caller_action_id: caller_action_id,
input: input },
if phase? Run, Finalize, Present
{ output: output }
end
)
end
|
#triggering_action ⇒ Object
action that caused this action to be planned. Available only in planning phase
210
211
212
213
|
# File 'lib/dynflow/action.rb', line 210
def triggering_action
phase! Plan
@triggering_action
end
|