Class: Bolt::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/bolt/executor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = Bolt::Config.new, noop = nil, plan_logging = false) ⇒ Executor

Returns a new instance of Executor.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/bolt/executor.rb', line 19

def initialize(config = Bolt::Config.new, noop = nil, plan_logging = false)
  @config = config
  @logger = Logging.logger[self]
  @plan_logging = plan_logging

  @transports = Bolt::TRANSPORTS.each_with_object({}) do |(key, val), coll|
    coll[key.to_s] = Concurrent::Delay.new { val.new }
  end

  @noop = noop
  @run_as = nil
  @pool = Concurrent::CachedThreadPool.new(max_threads: @config[:concurrency])
  @logger.debug { "Started with #{@config[:concurrency]} max thread(s)" }
  @notifier = Bolt::Notifier.new
end

Instance Attribute Details

#noopObject (readonly)

Returns the value of attribute noop.



16
17
18
# File 'lib/bolt/executor.rb', line 16

def noop
  @noop
end

#run_asObject

Returns the value of attribute run_as.



17
18
19
# File 'lib/bolt/executor.rb', line 17

def run_as
  @run_as
end

#transportsObject (readonly)

Returns the value of attribute transports.



16
17
18
# File 'lib/bolt/executor.rb', line 16

def transports
  @transports
end

Instance Method Details

#batch_execute(targets) ⇒ Object

Execute the given block on a list of nodes in parallel, one thread per “batch”.

This is the main driver of execution on a list of targets. It first groups targets by transport, then divides each group into batches as defined by the transport. Each batch, along with the corresponding transport, is yielded to the block in turn and the results all collected into a single ResultSet.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/bolt/executor.rb', line 57

def batch_execute(targets)
  promises = targets.group_by(&:protocol).flat_map do |protocol, protocol_targets|
    transport = transport(protocol)
    transport.batches(protocol_targets).flat_map do |batch|
      batch_promises = Array(batch).each_with_object({}) do |target, h|
        h[target] = Concurrent::Promise.new(executor: :immediate)
      end
      # Pass this argument through to avoid retaining a reference to a
      # local variable that will change on the next iteration of the loop.
      @pool.post(batch_promises) do |result_promises|
        begin
          results = yield transport, batch
          Array(results).each do |result|
            result_promises[result.target].set(result)
          end
        # NotImplementedError can be thrown if the transport is implemented improperly
        rescue StandardError, NotImplementedError => e
          result_promises.each do |target, promise|
            promise.set(Bolt::Result.from_exception(target, e))
          end
        ensure
          # Make absolutely sure every promise gets a result to avoid a
          # deadlock. Use whatever exception is causing this block to
          # execute, or generate one if we somehow got here without an
          # exception and some promise is still missing a result.
          result_promises.each do |target, promise|
            next if promise.fulfilled?
            error = $ERROR_INFO || Bolt::Error.new("No result was returned for #{target.uri}",
                                                   "puppetlabs.bolt/missing-result-error")
            promise.set(Bolt::Result.from_exception(target, error))
          end
        end
      end
      batch_promises.values
    end
  end
  ResultSet.new(promises.map(&:value))
end

#file_upload(targets, source, destination, options = {}, &callback) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/bolt/executor.rb', line 163

def file_upload(targets, source, destination, options = {}, &callback)
  description = options.fetch('_description', "file upload from #{source} to #{destination}")
  log_action("Starting: #{description} on #{targets.map(&:uri)}")
  notify = proc { |event| @notifier.notify(callback, event) if callback }
  options = { '_run_as' => run_as }.merge(options) if run_as

  results = batch_execute(targets) do |transport, batch|
    with_node_logging("Uploading file #{source} to #{destination}", batch) do
      transport.batch_upload(batch, source, destination, options, &notify)
    end
  end

  log_action(summary(description, results))
  @notifier.shutdown
  results
end

#run_command(targets, command, options = {}, &callback) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/bolt/executor.rb', line 110

def run_command(targets, command, options = {}, &callback)
  description = options.fetch('_description', "command '#{command}'")
  log_action("Starting: #{description} on #{targets.map(&:uri)}")
  notify = proc { |event| @notifier.notify(callback, event) if callback }
  options = { '_run_as' => run_as }.merge(options) if run_as

  results = batch_execute(targets) do |transport, batch|
    with_node_logging("Running command '#{command}'", batch) do
      transport.batch_command(batch, command, options, &notify)
    end
  end

  log_action(summary(description, results))
  @notifier.shutdown
  results
end

#run_script(targets, script, arguments, options = {}, &callback) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/bolt/executor.rb', line 127

def run_script(targets, script, arguments, options = {}, &callback)
  description = options.fetch('_description', "script #{script}")
  log_action("Starting: #{description} on #{targets.map(&:uri)}")

  notify = proc { |event| @notifier.notify(callback, event) if callback }
  options = { '_run_as' => run_as }.merge(options) if run_as

  results = batch_execute(targets) do |transport, batch|
    with_node_logging("Running script #{script} with '#{arguments}'", batch) do
      transport.batch_script(batch, script, arguments, options, &notify)
    end
  end

  log_action(summary(description, results))
  @notifier.shutdown
  results
end

#run_task(targets, task, arguments, options = {}, &callback) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/bolt/executor.rb', line 145

def run_task(targets, task, arguments, options = {}, &callback)
  description = options.fetch('_description', "task #{task.name}")
  log_action("Starting: #{description} on #{targets.map(&:uri)}")

  notify = proc { |event| @notifier.notify(callback, event) if callback }
  options = { '_run_as' => run_as }.merge(options) if run_as

  results = batch_execute(targets) do |transport, batch|
    with_node_logging("Running task #{task.name} with '#{arguments}' via #{task.input_method}", batch) do
      transport.batch_task(batch, task, arguments, options, &notify)
    end
  end

  log_action(summary(description, results))
  @notifier.shutdown
  results
end

#transport(transport) ⇒ Object



35
36
37
38
39
40
# File 'lib/bolt/executor.rb', line 35

def transport(transport)
  impl = @transports[transport || 'ssh']
  # If there was an error creating the transport, ensure it gets thrown
  impl.no_error!
  impl.value
end