Class: Bolt::FiberExecutor
- Inherits:
-
Object
- Object
- Bolt::FiberExecutor
- Defined in:
- lib/bolt/fiber_executor.rb
Instance Attribute Summary collapse
-
#plan_futures ⇒ Object
readonly
Returns the value of attribute plan_futures.
Instance Method Summary collapse
-
#create_future(scope: nil, name: nil) ⇒ Object
Creates a new Puppet scope from the current Plan scope so that variables can be used inside the block and won’t interact with the outer scope.
-
#in_parallel? ⇒ Boolean
Whether there is more than one fiber running in parallel.
-
#initialize ⇒ FiberExecutor
constructor
A new instance of FiberExecutor.
-
#plan_complete? ⇒ Boolean
Whether all PlanFutures have finished executing, indicating that the entire plan (main plan and any PlanFutures it spawned) has finished and Bolt can exit.
-
#round_robin ⇒ Object
Visit each PlanFuture registered with the FiberExecutor and resume it.
-
#wait(futures, timeout: nil, catch_errors: false, **_kwargs) ⇒ Object
Block until the provided PlanFuture objects have finished, or the timeout is reached.
Constructor Details
#initialize ⇒ FiberExecutor
Returns a new instance of FiberExecutor.
10 11 12 13 14 |
# File 'lib/bolt/fiber_executor.rb', line 10 def initialize @logger = Bolt::Logger.logger(self) @id = 0 @plan_futures = [] end |
Instance Attribute Details
#plan_futures ⇒ Object (readonly)
Returns the value of attribute plan_futures.
8 9 10 |
# File 'lib/bolt/fiber_executor.rb', line 8 def plan_futures @plan_futures end |
Instance Method Details
#create_future(scope: nil, name: nil) ⇒ Object
Creates a new Puppet scope from the current Plan scope so that variables can be used inside the block and won’t interact with the outer scope. Then creates a new Fiber to execute the block, wraps the Fiber in a Bolt::PlanFuture, and returns the Bolt::PlanFuture.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/bolt/fiber_executor.rb', line 27 def create_future(scope: nil, name: nil) newscope = nil if scope # Save existing variables to the new scope before starting the future # itself so that if the plan returns before the backgrounded block # starts, we still have the variables. newscope = Puppet::Parser::Scope.new(scope.compiler) local = Puppet::Parser::Scope::LocalScope.new # Compress the current scopes into a single vars hash to add to the new scope scope.to_hash.each_pair { |k, v| local[k] = v } newscope.push_ephemerals([local]) end # Create a new Fiber that will execute the provided block. future = Fiber.new do # Yield the new scope - this should be ignored by the block if # `newscope` is nil. yield newscope end # PlanFutures are assigned an ID, which is just a global incrementing # integer. The main plan should always have ID 0. @id += 1 future = Bolt::PlanFuture.new(future, @id, name) @logger.trace("Created future #{future.name}") # Register the PlanFuture with the FiberExecutor to be executed plan_futures << future future end |
#in_parallel? ⇒ Boolean
Whether there is more than one fiber running in parallel.
18 19 20 |
# File 'lib/bolt/fiber_executor.rb', line 18 def in_parallel? plan_futures.length > 1 end |
#plan_complete? ⇒ Boolean
Whether all PlanFutures have finished executing, indicating that the entire plan (main plan and any PlanFutures it spawned) has finished and Bolt can exit.
103 104 105 |
# File 'lib/bolt/fiber_executor.rb', line 103 def plan_complete? plan_futures.empty? end |
#round_robin ⇒ Object
Visit each PlanFuture registered with the FiberExecutor and resume it. Fibers will yield themselves back, either if they kicked off a long-running process or if the current long-running process hasn’t completed. If the Fiber finishes after being resumed, store the result in the PlanFuture and remove the PlanFuture from the FiberExecutor.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/bolt/fiber_executor.rb', line 65 def round_robin plan_futures.each do |future| # If the Fiber is still running and can be resumed, then resume it @logger.trace("Checking future '#{future.name}'") if future.alive? @logger.trace("Resuming future '#{future.name}'") future.resume end # Once we've restarted the Fiber, check to see if it's finished again # and cleanup if it has. next if future.alive? @logger.trace("Cleaning up future '#{future.name}'") # If the future errored and the main plan has already exited, log the # error at warn level. unless plan_futures.map(&:id).include?(0) || future.state == "done" Bolt::Logger.warn('errored_futures', "Error in future '#{future.name}': #{future.value}") end # Remove the PlanFuture from the FiberExecutor. plan_futures.delete(future) end # If the Fiber immediately returned or if the Fiber is blocking on a # `wait` call, Bolt should pause for long enough that something can # execute before checking again. This mitigates CPU # thrashing. return unless plan_futures.all? { |f| %i[returned_immediately unfinished].include?(f.value) } @logger.trace("Nothing can be resumed. Rechecking in 0.5 seconds.") sleep(0.5) end |
#wait(futures, timeout: nil, catch_errors: false, **_kwargs) ⇒ Object
Block until the provided PlanFuture objects have finished, or the timeout is reached.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/bolt/fiber_executor.rb', line 109 def wait(futures, timeout: nil, catch_errors: false, **_kwargs) if timeout.nil? Fiber.yield(:unfinished) until futures.map(&:alive?).none? else start = Time.now Fiber.yield(:unfinished) until (Time.now - start > timeout) || futures.map(&:alive?).none? # Raise an error for any futures that are still alive futures.each do |f| if f.alive? f.raise(Bolt::FutureTimeoutError.new(f.name, timeout)) end end end results = futures.map(&:value) failed_indices = results.each_index.select do |i| results[i].is_a?(Bolt::Error) end if failed_indices.any? if catch_errors failed_indices.each { |i| results[i] = results[i].to_puppet_error } else # Do this after handling errors for simplicity and pretty printing raise Bolt::ParallelFailure.new(results, failed_indices) end end results end |