Module: Dynflow::World::Invalidation

Included in:
Dynflow::World
Defined in:
lib/dynflow/world/invalidation.rb

Instance Method Summary collapse

Instance Method Details

#invalidate(world) ⇒ void

This method returns an undefined value.

Invalidate another world, that left some data in the runtime, but it’s not really running

Parameters:



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/dynflow/world/invalidation.rb', line 11

def invalidate(world)
  Type! world, Coordinator::ClientWorld, Coordinator::ExecutorWorld

  coordinator.acquire(Coordinator::WorldInvalidationLock.new(self, world)) do
    coordinator.find_locks(class: Coordinator::PlanningLock.name,
                           owner_id: 'world:' + world.id).each do |lock|
      invalidate_planning_lock lock
    end

    if world.is_a? Coordinator::ExecutorWorld
      old_execution_locks = coordinator.find_locks(class: Coordinator::ExecutionLock.name,
                                                   owner_id: "world:#{world.id}")

      coordinator.deactivate_world(world)

      old_execution_locks.each do |execution_lock|
        invalidate_execution_lock(execution_lock)
      end
    end

    pruned = persistence.prune_envelopes(world.id)
    logger.error("Pruned #{pruned} envelopes for invalidated world #{world.id}") unless pruned.zero?
    coordinator.delete_world(world)
  end
end

#invalidate_execution_lock(execution_lock) ⇒ void

This method returns an undefined value.

Invalidate an execution lock, left behind by a executor that was executing an execution plan when it was terminated.

Parameters:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/dynflow/world/invalidation.rb', line 58

def invalidate_execution_lock(execution_lock)
  with_valid_execution_plan_for_lock(execution_lock) do |plan|
    plan.steps.values.each { |step| invalidate_step step }
    plan.execution_history.add('terminate execution', execution_lock.world_id)
    plan.update_state(:paused, history_notice: false) if plan.state == :running
    plan.save
    coordinator.release(execution_lock)

    if plan.error?
      new_state = plan.prepare_for_rescue
      execute(plan.id) if new_state == :running
    else
      if coordinator.find_worlds(true).any? # Check if there are any executors
        client_dispatcher.tell([:dispatch_request,
                                Dispatcher::Execution[execution_lock.execution_plan_id],
                                execution_lock.client_world_id,
                                execution_lock.request_id])
      end
    end
  end
rescue Errors::PersistenceError
  logger.error "failed to write data while invalidating execution lock #{execution_lock}"
end

#invalidate_planning_lock(planning_lock) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/dynflow/world/invalidation.rb', line 37

def invalidate_planning_lock(planning_lock)
  with_valid_execution_plan_for_lock(planning_lock) do |plan|
    plan.steps.values.each { |step| invalidate_step step }

    state = if plan.plan_steps.any? && plan.plan_steps.all? { |step| step.state == :success }
              :planned
            else
              :stopped
            end
    plan.update_state(state) if plan.state != state

    coordinator.release(planning_lock)
    execute(plan.id) if plan.state == :planned
  end
end

#locks_validity_checkArray<Coordinator::Lock>

Cleans up locks which don’t have a resource

Returns:



168
169
170
171
172
173
174
175
176
# File 'lib/dynflow/world/invalidation.rb', line 168

def locks_validity_check
  orphaned_locks = coordinator.clean_orphaned_locks

  unless orphaned_locks.empty?
    logger.error "invalid coordinator locks found and invalidated: #{orphaned_locks.inspect}"
  end

  return orphaned_locks
end

#perform_validity_checksInteger

Performs world validity checks

Returns:

  • (Integer)

    number of invalidated worlds



117
118
119
120
121
122
123
# File 'lib/dynflow/world/invalidation.rb', line 117

def perform_validity_checks
  world_invalidation_result = worlds_validity_check
  locks_validity_check
  pruned = connector.prune_undeliverable_envelopes(self)
  logger.error("Pruned #{pruned} undeliverable envelopes") unless pruned.zero?
  world_invalidation_result.values.select { |result| result == :invalidated }.size
end

#with_valid_execution_plan_for_lock(execution_lock) {|execution_plan| ... } ⇒ void

This method returns an undefined value.

Tries to load an execution plan using id stored in the lock. If the execution plan cannot be loaded or is invalid, the lock is released. If the plan gets loaded successfully, it is yielded to a given block.

Parameters:

Yield Parameters:

  • execution_plan (ExecutionPlan)

    the successfully loaded execution plan



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/dynflow/world/invalidation.rb', line 91

def with_valid_execution_plan_for_lock(execution_lock)
  begin
    plan = persistence.load_execution_plan(execution_lock.execution_plan_id)
  rescue => e
    if e.is_a?(KeyError)
      logger.error "invalidated execution plan #{execution_lock.execution_plan_id} missing, skipping"
    else
      logger.error e
      logger.error "unexpected error when invalidating execution plan #{execution_lock.execution_plan_id}, skipping"
    end
    coordinator.release(execution_lock)
    coordinator.release_by_owner(execution_lock.id)
    return
  end
  unless plan.valid?
    logger.error "invalid plan #{plan.id}, skipping"
    coordinator.release(execution_lock)
    coordinator.release_by_owner(execution_lock.id)
    return
  end
  yield plan
end

#worlds_validity_check(auto_invalidate = true, worlds_filter = {}) ⇒ Hash{String=>Symbol}

Checks if all worlds are valid and optionally invalidates them

Parameters:

  • auto_invalidate (Boolean) (defaults to: true)

    whether automatic invalidation should be performed

  • worlds_filter (Hash) (defaults to: {})

    hash of filters to select only matching worlds

Returns:

  • (Hash{String=>Symbol})

    hash containg validation results, mapping world id to a result



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/dynflow/world/invalidation.rb', line 130

def worlds_validity_check(auto_invalidate = true, worlds_filter = {})
  worlds = coordinator.find_worlds(false, worlds_filter)

  world_checks = worlds.reduce({}) do |hash, world|
    hash.update(world => ping_without_cache(world.id, self.validity_check_timeout))
  end
  world_checks.values.each(&:wait)

  results = {}
  world_checks.each do |world, check|
    if check.fulfilled?
      result = :valid
    else
      if auto_invalidate
        begin
          invalidate(world)
          result = :invalidated
        rescue => e
          logger.error e
          result = e.message
        end
      else
        result = :invalid
      end
    end
    results[world.id] = result
  end

  unless results.values.all? { |result| result == :valid }
    logger.error "invalid worlds found #{results.inspect}"
  end

  return results
end