Class: Bolt::FiberExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/bolt/fiber_executor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeFiberExecutor

Returns a new instance of FiberExecutor.



10
11
12
13
14
# File 'lib/bolt/fiber_executor.rb', line 10

def initialize
  @logger = Bolt::Logger.logger(self)
  @id = 0
  @plan_futures = []
end

Instance Attribute Details

#plan_futuresObject (readonly)

Returns the value of attribute plan_futures.



8
9
10
# File 'lib/bolt/fiber_executor.rb', line 8

def plan_futures
  @plan_futures
end

Instance Method Details

#create_future(scope: nil, name: nil) ⇒ Object

Creates a new Puppet scope from the current Plan scope so that variables can be used inside the block and won’t interact with the outer scope. Then creates a new Fiber to execute the block, wraps the Fiber in a Bolt::PlanFuture, and returns the Bolt::PlanFuture.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/bolt/fiber_executor.rb', line 27

def create_future(scope: nil, name: nil)
  newscope = nil
  if scope
    # Save existing variables to the new scope before starting the future
    # itself so that if the plan returns before the backgrounded block
    # starts, we still have the variables.
    newscope = Puppet::Parser::Scope.new(scope.compiler)
    local = Puppet::Parser::Scope::LocalScope.new

    # Compress the current scopes into a single vars hash to add to the new scope
    scope.to_hash.each_pair { |k, v| local[k] = v }
    newscope.push_ephemerals([local])
  end

  # Create a new Fiber that will execute the provided block.
  future = Fiber.new do
    # Yield the new scope - this should be ignored by the block if
    # `newscope` is nil.
    yield newscope
  end

  # PlanFutures are assigned an ID, which is just a global incrementing
  # integer. The main plan should always have ID 0.
  @id += 1
  future = Bolt::PlanFuture.new(future, @id, name)
  @logger.trace("Created future #{future.name}")

  # Register the PlanFuture with the FiberExecutor to be executed
  plan_futures << future
  future
end

#in_parallel?Boolean

Whether there is more than one fiber running in parallel.

Returns:

  • (Boolean)


18
19
20
# File 'lib/bolt/fiber_executor.rb', line 18

def in_parallel?
  plan_futures.length > 1
end

#plan_complete?Boolean

Whether all PlanFutures have finished executing, indicating that the entire plan (main plan and any PlanFutures it spawned) has finished and Bolt can exit.

Returns:

  • (Boolean)


103
104
105
# File 'lib/bolt/fiber_executor.rb', line 103

def plan_complete?
  plan_futures.empty?
end

#round_robinObject

Visit each PlanFuture registered with the FiberExecutor and resume it. Fibers will yield themselves back, either if they kicked off a long-running process or if the current long-running process hasn’t completed. If the Fiber finishes after being resumed, store the result in the PlanFuture and remove the PlanFuture from the FiberExecutor.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/bolt/fiber_executor.rb', line 65

def round_robin
  plan_futures.each do |future|
    # If the Fiber is still running and can be resumed, then resume it
    @logger.trace("Checking future '#{future.name}'")
    if future.alive?
      @logger.trace("Resuming future '#{future.name}'")
      future.resume
    end

    # Once we've restarted the Fiber, check to see if it's finished again
    # and cleanup if it has.
    next if future.alive?
    @logger.trace("Cleaning up future '#{future.name}'")

    # If the future errored and the main plan has already exited, log the
    # error at warn level.
    unless plan_futures.map(&:id).include?(0) || future.state == "done"
      Bolt::Logger.warn('errored_futures', "Error in future '#{future.name}': #{future.value}")
    end

    # Remove the PlanFuture from the FiberExecutor.
    plan_futures.delete(future)
  end

  # If the Fiber immediately returned or if the Fiber is blocking on a
  # `wait` call, Bolt should pause for long enough that something can
  # execute before checking again. This mitigates CPU
  # thrashing.
  return unless plan_futures.all? { |f| %i[returned_immediately unfinished].include?(f.value) }
  @logger.trace("Nothing can be resumed. Rechecking in 0.5 seconds.")

  sleep(0.5)
end

#wait(futures, timeout: nil, catch_errors: false, **_kwargs) ⇒ Object

Block until the provided PlanFuture objects have finished, or the timeout is reached.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/bolt/fiber_executor.rb', line 109

def wait(futures, timeout: nil, catch_errors: false, **_kwargs)
  if timeout.nil?
    Fiber.yield(:unfinished) until futures.map(&:alive?).none?
  else
    start = Time.now
    Fiber.yield(:unfinished) until (Time.now - start > timeout) || futures.map(&:alive?).none?
    # Raise an error for any futures that are still alive
    futures.each do |f|
      if f.alive?
        f.raise(Bolt::FutureTimeoutError.new(f.name, timeout))
      end
    end
  end

  results = futures.map(&:value)

  failed_indices = results.each_index.select do |i|
    results[i].is_a?(Bolt::Error)
  end

  if failed_indices.any?
    if catch_errors
      failed_indices.each { |i| results[i] = results[i].to_puppet_error }
    else
      # Do this after handling errors for simplicity and pretty printing
      raise Bolt::ParallelFailure.new(results, failed_indices)
    end
  end

  results
end