Module: Dynflow::World::Invalidation

Included in:
Dynflow::World
Defined in:
lib/dynflow/world/invalidation.rb

Instance Method Summary collapse

Instance Method Details

#invalidate(world) ⇒ void

This method returns an undefined value.

Invalidate another world, that left some data in the runtime, but it’s not really running

Parameters:



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/dynflow/world/invalidation.rb', line 12

def invalidate(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld

  coordinator.acquire(Coordinator::WorldInvalidationLock.new(self, world)) do
    coordinator.find_locks(class: Coordinator::PlanningLock.name,
                           owner_id: 'world:' + world.id).each do |lock|
      invalidate_planning_lock lock
    end

    if world.is_a? Coordinator::ExecutorWorld
      old_execution_locks = coordinator.find_locks(class: Coordinator::ExecutionLock.name,
                                                   owner_id: "world:#{world.id}")

      coordinator.deactivate_world(world)

      old_execution_locks.each do |execution_lock|
        invalidate_execution_lock(execution_lock)
      end
    end

    pruned = persistence.prune_envelopes(world.id)
    logger.error("Pruned #{pruned} envelopes for invalidated world #{world.id}") unless pruned.zero?
    coordinator.delete_world(world)
  end
end

#invalidate_execution_lock(execution_lock) ⇒ void

This method returns an undefined value.

Invalidate an execution lock, left behind by a executor that was executing an execution plan when it was terminated.

Parameters:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/dynflow/world/invalidation.rb', line 59

def invalidate_execution_lock(execution_lock)
  with_valid_execution_plan_for_lock(execution_lock) do |plan|
    plan.steps.values.each { |step| invalidate_step step }
    plan.execution_history.add('terminate execution', execution_lock.world_id)
    plan.update_state(:paused, history_notice: false) if plan.state == :running
    plan.save
    coordinator.release(execution_lock)

    if plan.error?
      new_state = plan.prepare_for_rescue
      execute(plan.id) if new_state == :running
    else
      if coordinator.find_worlds(true).any? # Check if there are any executors
        client_dispatcher.tell([:dispatch_request,
                                Dispatcher::Execution[execution_lock.execution_plan_id],
                                execution_lock.client_world_id,
                                execution_lock.request_id])
      end
    end
  end
rescue Errors::PersistenceError
  logger.error "failed to write data while invalidating execution lock #{execution_lock}"
end

#invalidate_planning_lock(planning_lock) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/dynflow/world/invalidation.rb', line 38

def invalidate_planning_lock(planning_lock)
  with_valid_execution_plan_for_lock(planning_lock) do |plan|
    plan.steps.values.each { |step| invalidate_step step }

    state = if plan.plan_steps.any? && plan.plan_steps.all? { |step| step.state == :success }
              :planned
            else
              :stopped
            end
    plan.update_state(state) if plan.state != state

    coordinator.release(planning_lock)
    execute(plan.id) if plan.state == :planned
  end
end

#locks_validity_checkArray<Coordinator::Lock>

Cleans up locks which don’t have a resource

Returns:



169
170
171
172
173
174
175
176
177
# File 'lib/dynflow/world/invalidation.rb', line 169

def locks_validity_check
  orphaned_locks = coordinator.clean_orphaned_locks

  unless orphaned_locks.empty?
    logger.error "invalid coordinator locks found and invalidated: #{orphaned_locks.inspect}"
  end

  return orphaned_locks
end

#perform_validity_checksInteger

Performs world validity checks

Returns:

  • (Integer)

    number of invalidated worlds



118
119
120
121
122
123
124
# File 'lib/dynflow/world/invalidation.rb', line 118

def perform_validity_checks
  world_invalidation_result = worlds_validity_check
  locks_validity_check
  pruned = connector.prune_undeliverable_envelopes(self)
  logger.error("Pruned #{pruned} undeliverable envelopes") unless pruned.zero?
  world_invalidation_result.values.select { |result| result == :invalidated }.size
end

#with_valid_execution_plan_for_lock(execution_lock) {|execution_plan| ... } ⇒ void

This method returns an undefined value.

Tries to load an execution plan using id stored in the lock. If the execution plan cannot be loaded or is invalid, the lock is released. If the plan gets loaded successfully, it is yielded to a given block.

Parameters:

Yield Parameters:

  • execution_plan (ExecutionPlan)

    the successfully loaded execution plan



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/dynflow/world/invalidation.rb', line 92

def with_valid_execution_plan_for_lock(execution_lock)
  begin
    plan = persistence.load_execution_plan(execution_lock.execution_plan_id)
  rescue => e
    if e.is_a?(KeyError)
      logger.error "invalidated execution plan #{execution_lock.execution_plan_id} missing, skipping"
    else
      logger.error e
      logger.error "unexpected error when invalidating execution plan #{execution_lock.execution_plan_id}, skipping"
    end
    coordinator.release(execution_lock)
    coordinator.release_by_owner(execution_lock.id)
    return
  end
  unless plan.valid?
    logger.error "invalid plan #{plan.id}, skipping"
    coordinator.release(execution_lock)
    coordinator.release_by_owner(execution_lock.id)
    return
  end
  yield plan
end

#worlds_validity_check(auto_invalidate = true, worlds_filter = {}) ⇒ Hash{String=>Symbol}

Checks if all worlds are valid and optionally invalidates them

Parameters:

  • auto_invalidate (Boolean) (defaults to: true)

    whether automatic invalidation should be performed

  • worlds_filter (Hash) (defaults to: {})

    hash of filters to select only matching worlds

Returns:

  • (Hash{String=>Symbol})

    hash containg validation results, mapping world id to a result



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/dynflow/world/invalidation.rb', line 131

def worlds_validity_check(auto_invalidate = true, worlds_filter = {})
  worlds = coordinator.find_worlds(false, worlds_filter)

  world_checks = worlds.reduce({}) do |hash, world|
    hash.update(world => ping_without_cache(world.id, self.validity_check_timeout))
  end
  world_checks.values.each(&:wait)

  results = {}
  world_checks.each do |world, check|
    if check.fulfilled?
      result = :valid
    else
      if auto_invalidate
        begin
          invalidate(world)
          result = :invalidated
        rescue => e
          logger.error e
          result = e.message
        end
      else
        result = :invalid
      end
    end
    results[world.id] = result
  end

  unless results.values.all? { |result| result == :valid }
    logger.error "invalid worlds found #{results.inspect}"
  end

  return results
end