Module: Dynflow::Action::WithBulkSubPlans

Includes:
Cancellable
Defined in:
lib/dynflow/action/with_bulk_sub_plans.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =
100
PlanNextBatch =
Algebrick.atom

Constants included from Cancellable

Cancellable::Abort, Cancellable::Cancel

Instance Method Summary collapse

Methods included from Cancellable

#abort!

Instance Method Details

#batch(from, size) ⇒ Object

Should return a slice of size items starting from item with index from

Raises:

  • (NotImplementedError)


9
10
11
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 9

def batch(from, size)
  raise NotImplementedError
end

#batch_sizeObject



56
57
58
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 56

def batch_size
  DEFAULT_BATCH_SIZE
end

#cancel!(force = false) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 76

def cancel!(force = false)
  # Count the not-yet-planned tasks as cancelled
  output[:cancelled_count] = total_count - output[:planned_count]
  if uses_concurrency_control
    # Tell the throttle limiter to cancel the tasks its managing
    world.throttle_limiter.cancel!(execution_plan_id)
  else
    # Just stop the tasks which were not started yet
    sub_plans(:state => 'planned').each { |sub_plan| sub_plan.update_state(:stopped) }
  end
  # Pass the cancel event to running sub plans if they can be cancelled
  sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? }
  suspend
end

#current_batchObject

Returns the items in the current batch



50
51
52
53
54
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 50

def current_batch
  start_position = output[:planned_count]
  size = start_position + batch_size > total_count ? total_count - start_position : batch_size
  batch(start_position, size)
end

#increase_counts(planned, failed) ⇒ Object



39
40
41
42
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 39

def increase_counts(planned, failed)
  super(planned, failed, false)
  output[:planned_count] += planned + failed
end

#initiateObject



32
33
34
35
36
37
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 32

def initiate
  output[:planned_count] = 0
  output[:cancelled_count] = 0
  output[:total_count] = total_count
  super
end

#on_planning_finishedObject



28
29
30
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 28

def on_planning_finished
  suspend
end

#run(event = nil) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 15

def run(event = nil)
  if event === PlanNextBatch
    if can_spawn_next_batch?
      spawn_plans
      suspend
    else
      on_planning_finished
    end
  else
    super
  end
end

#run_progressObject

The same logic as in Action::WithSubPlans, but calculated using the expected total count



61
62
63
64
65
66
67
68
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 61

def run_progress
  if counts_set? && total_count > 0
    sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+)
    sum.to_f / total_count
  else
    0.1
  end
end

#spawn_plansObject



70
71
72
73
74
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 70

def spawn_plans
  super
ensure
  plan_event(PlanNextBatch)
end

#total_countObject

Should return the expected total count of tasks

Raises:

  • (NotImplementedError)


45
46
47
# File 'lib/dynflow/action/with_bulk_sub_plans.rb', line 45

def total_count
  raise NotImplementedError
end