Module: Dynflow::Action::WithSubPlans
- Includes:
- Cancellable
- Defined in:
- lib/dynflow/action/with_sub_plans.rb
Defined Under Namespace
Classes: SubtaskFailedException
Constant Summary
collapse
- SubPlanFinished =
Algebrick.type do
fields! :execution_plan_id => String,
:success => type { variants TrueClass, FalseClass }
end
Constants included
from Cancellable
Cancellable::Abort, Cancellable::Cancel
Instance Method Summary
collapse
Instance Method Details
#abort! ⇒ Object
81
82
83
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 81
def abort!
cancel! true
end
|
#calculate_time_distribution ⇒ Object
106
107
108
109
110
111
112
113
114
115
116
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 106
def calculate_time_distribution
time, count = input[:concurrency_control][:time]
unless time.nil? || time.is_a?(Hash)
level = input[:concurrency_control].fetch(:level, {}).fetch(:free, 1)
semaphore = ::Dynflow::Semaphores::Stateful.new(nil, level,
:interval => time.to_f / (count * level),
:time_span => time)
input[:concurrency_control][:time] = semaphore.to_hash
end
end
|
#cancel!(force = false) ⇒ Object
75
76
77
78
79
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 75
def cancel!(force = false)
@world.throttle_limiter.cancel!(execution_plan_id)
sub_plans('state' => 'running').each { |sub_plan| sub_plan.cancel(force) }
suspend
end
|
#check_for_errors! ⇒ Object
236
237
238
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 236
def check_for_errors!
raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end
|
#counts_set? ⇒ Boolean
232
233
234
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 232
def counts_set?
output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end
|
#create_sub_plans ⇒ Object
This method is abstract.
when the logic for the initiation of the subtasks is different from the default one.
67
68
69
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 67
def create_sub_plans
raise NotImplementedError
end
|
#distribute_over_time(time_span, count) ⇒ Object
118
119
120
121
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 118
def distribute_over_time(time_span, count)
input[:concurrency_control] ||= {}
input[:concurrency_control][:time] = [time_span, count]
end
|
#done? ⇒ Boolean
197
198
199
200
201
202
203
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 197
def done?
if counts_set?
output[:total_count] - output[:success_count] - output[:failed_count] <= 0
else
false
end
end
|
#increase_counts(planned, failed, track_total = true) ⇒ Object
133
134
135
136
137
138
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 133
def increase_counts(planned, failed, track_total = true)
output[:total_count] = output.fetch(:total_count, 0) + planned + failed if track_total
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:pending_count] = output.fetch(:pending_count, 0) + planned
output[:success_count] ||= 0
end
|
#initiate ⇒ Object
38
39
40
41
42
43
44
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 38
def initiate
if uses_concurrency_control
calculate_time_distribution
world.throttle_limiter.initialize_plan(execution_plan_id, input[:concurrency_control])
end
spawn_plans
end
|
#limit_concurrency_level(level) ⇒ Object
101
102
103
104
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 101
def limit_concurrency_level(level)
input[:concurrency_control] ||= {}
input[:concurrency_control][:level] = ::Dynflow::Semaphores::Stateful.new(level).to_hash
end
|
#mark_as_done(plan_id, success) ⇒ Object
188
189
190
191
192
193
194
195
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 188
def mark_as_done(plan_id, success)
if success
output[:success_count] += 1
else
output[:failed_count] += 1
end
output[:pending_count] -= 1
end
|
#notify_on_finish(plans) ⇒ Object
178
179
180
181
182
183
184
185
186
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 178
def notify_on_finish(plans)
suspend do |suspended_action|
plans.each do |plan|
plan.finished.on_resolution! do |success, value|
suspended_action << SubPlanFinished[plan.id, success && (value.result == :success)]
end
end
end
end
|
#on_finish ⇒ Object
72
73
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 72
def on_finish
end
|
#recalculate_counts ⇒ Object
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 213
def recalculate_counts
output.update(total_count: 0,
failed_count: 0,
success_count: 0,
pending_count: 0)
sub_plans.each do |sub_plan|
output[:total_count] += 1
if sub_plan.state == :stopped
if sub_plan.error?
output[:failed_count] += 1
else
output[:success_count] += 1
end
else
output[:pending_count] += 1
end
end
end
|
#resume ⇒ Object
151
152
153
154
155
156
157
158
159
160
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 151
def resume
if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
%w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
initiate
else
recalculate_counts
try_to_finish or fail "Some sub plans are still not finished"
end
end
|
#run(event = nil) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 17
def run(event = nil)
match event,
(on nil do
if output[:total_count]
resume
else
initiate
end
end),
(on SubPlanFinished do
mark_as_done(event.execution_plan_id, event.success)
try_to_finish or suspend
end),
(on Action::Cancellable::Cancel do
cancel!
end),
(on Action::Cancellable::Abort do
abort!
end)
end
|
#run_progress ⇒ Object
205
206
207
208
209
210
211
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 205
def run_progress
if counts_set? && output[:total_count] > 0
(output[:success_count] + output[:failed_count]).to_f / output[:total_count]
else
0.1
end
end
|
#spawn_plans ⇒ Object
46
47
48
49
50
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 46
def spawn_plans
sub_plans = create_sub_plans
sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
wait_for_sub_plans sub_plans
end
|
#sub_plans(filter = {}) ⇒ Object
162
163
164
165
166
167
168
169
170
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 162
def sub_plans(filter = {})
filters = { 'caller_execution_plan_id' => execution_plan_id,
'caller_action_id' => self.id }
if filter.empty?
@sub_plans ||= world.persistence.find_execution_plans(filters: filters)
else
world.persistence.find_execution_plans(filters: filters.merge(filter))
end
end
|
#sub_plans_count(filter = {}) ⇒ Object
172
173
174
175
176
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 172
def sub_plans_count(filter = {})
filters = { 'caller_execution_plan_id' => execution_plan_id,
'caller_action_id' => self.id }
world.persistence.find_execution_plan_counts(filters: filters.merge(filter))
end
|
#trigger(action_class, *args) ⇒ Object
Helper for creating sub plans
86
87
88
89
90
91
92
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 86
def trigger(action_class, *args)
if uses_concurrency_control
trigger_with_concurrency_control(action_class, *args)
else
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end
end
|
#trigger_with_concurrency_control(action_class, *args) ⇒ Object
94
95
96
97
98
99
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 94
def trigger_with_concurrency_control(action_class, *args)
record = world.plan_with_options(action_class: action_class, args: args, caller_action: self)
records = [[record.id], []]
records.reverse! unless record.state == :planned
@world.throttle_limiter.handle_plans!(execution_plan_id, *records).first
end
|
#try_to_finish ⇒ Object
140
141
142
143
144
145
146
147
148
149
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 140
def try_to_finish
if done?
world.throttle_limiter.finish(execution_plan_id)
check_for_errors!
on_finish
return true
else
return false
end
end
|
#uses_concurrency_control ⇒ Object
240
241
242
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 240
def uses_concurrency_control
@uses_concurrency_control = input.key? :concurrency_control
end
|
#wait_for_sub_plans(sub_plans) ⇒ Object
123
124
125
126
127
128
129
130
131
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 123
def wait_for_sub_plans(sub_plans)
planned, failed = sub_plans.partition(&:planned?)
increase_counts(planned.count, failed.count)
if planned.any?
notify_on_finish(planned)
else
check_for_errors!
end
end
|