Class: Bolt::Transport::Orch

Inherits:
Base
  • Object
show all
Defined in:
lib/bolt/transport/orch.rb,
lib/bolt/transport/orch/connection.rb

Defined Under Namespace

Classes: Connection

Constant Summary collapse

CONF_FILE =
if !ENV['HOME'].nil?
  File.expand_path('~/.puppetlabs/client-tools/orchestrator.conf')
else
  '/etc/puppetlabs/client-tools/orchestrator.conf'
end
BOLT_COMMAND_TASK =
Struct.new(:name).new('bolt_shim::command').freeze
BOLT_SCRIPT_TASK =
Struct.new(:name).new('bolt_shim::script').freeze
BOLT_UPLOAD_TASK =
Struct.new(:name).new('bolt_shim::upload').freeze

Constants inherited from Base

Base::ENVIRONMENT_METHODS, Base::STDIN_METHODS

Instance Attribute Summary collapse

Attributes inherited from Base

#logger

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#assert_batch_size_one, #connected?, #default_input_method, #envify_params, filter_options, #run_command, #run_script, #run_task, #select_implementation, #unwrap_sensitive_args, #upload, #with_events

Constructor Details

#initialize(*args) ⇒ Orch

Returns a new instance of Orch.



38
39
40
41
# File 'lib/bolt/transport/orch.rb', line 38

def initialize(*args)
  @connections = {}
  super
end

Instance Attribute Details

#plan_context=(value) ⇒ Object (writeonly)

Sets the attribute plan_context

Parameters:

  • value

    the value to set the attribute plan_context to.



26
27
28
# File 'lib/bolt/transport/orch.rb', line 26

def plan_context=(value)
  @plan_context = value
end

Class Method Details

.optionsObject



28
29
30
# File 'lib/bolt/transport/orch.rb', line 28

def self.options
  %w[service-url cacert token-file task-environment]
end

.validate(options) ⇒ Object



36
# File 'lib/bolt/transport/orch.rb', line 36

def self.validate(options); end

Instance Method Details

#batch_command(targets, command, options = {}, &callback) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/bolt/transport/orch.rb', line 92

def batch_command(targets, command, options = {}, &callback)
  params = {
    'command' => command
  }
  results = run_task_job(targets,
                         BOLT_COMMAND_TASK,
                         params,
                         options,
                         &callback)
  callback ||= proc {}
  results.map! { |result| unwrap_bolt_result(result.target, result) }
  results.each do |result|
    callback.call(type: :node_result, result: result)
  end
end

#batch_connected?(targets) ⇒ Boolean

Returns:

  • (Boolean)


217
218
219
220
# File 'lib/bolt/transport/orch.rb', line 217

def batch_connected?(targets)
  resp = get_connection(targets.first.options).query_inventory(targets)
  resp['items'].all? { |node| node['connected'] }
end

#batch_script(targets, script, arguments, options = {}, &callback) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/bolt/transport/orch.rb', line 108

def batch_script(targets, script, arguments, options = {}, &callback)
  content = File.open(script, &:read)
  content = Base64.encode64(content)
  params = {
    'content' => content,
    'arguments' => arguments,
    'name' => Pathname(script).basename.to_s
  }
  callback ||= proc {}
  results = run_task_job(targets, BOLT_SCRIPT_TASK, params, options, &callback)
  results.map! { |result| unwrap_bolt_result(result.target, result) }
  results.each do |result|
    callback.call(type: :node_result, result: result)
  end
end

#batch_task(targets, task, arguments, options = {}, &callback) ⇒ Object



209
210
211
212
213
214
215
# File 'lib/bolt/transport/orch.rb', line 209

def batch_task(targets, task, arguments, options = {}, &callback)
  callback ||= proc {}
  results = run_task_job(targets, task, arguments, options, &callback)
  results.each do |result|
    callback.call(type: :node_result, result: result)
  end
end

#batch_upload(targets, source, destination, options = {}, &callback) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/bolt/transport/orch.rb', line 154

def batch_upload(targets, source, destination, options = {}, &callback)
  stat = File.stat(source)
  content = if stat.directory?
              pack(source)
            else
              File.open(source, &:read)
            end
  content = Base64.encode64(content)
  mode = File.stat(source).mode
  params = {
    'path' => destination,
    'content' => content,
    'mode' => mode,
    'directory' => stat.directory?
  }
  callback ||= proc {}
  results = run_task_job(targets, BOLT_UPLOAD_TASK, params, options, &callback)
  results.map! do |result|
    if result.error_hash
      result
    else
      Bolt::Result.for_upload(result.target, source, destination)
    end
  end
  results.each do |result|
    callback&.call(type: :node_result, result: result)
  end
end

#batches(targets) ⇒ Object



183
184
185
# File 'lib/bolt/transport/orch.rb', line 183

def batches(targets)
  targets.group_by { |target| Connection.get_key(target.options) }.values
end

#finish_plan(result) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/bolt/transport/orch.rb', line 43

def finish_plan(result)
  if result.is_a? Bolt::PlanResult
    @connections.each_value do |conn|
      begin
        conn.finish_plan(result)
      rescue StandardError => e
        @logger.debug("Failed to finish plan on #{conn.key}: #{e.message}")
      end
    end
  end
end

#get_connection(conn_opts) ⇒ Object

It’s safe to create connections here for now because the batches/threads are per connection.



57
58
59
60
61
62
63
# File 'lib/bolt/transport/orch.rb', line 57

def get_connection(conn_opts)
  key = Connection.get_key(conn_opts)
  unless (conn = @connections[key])
    conn = @connections[key] = Connection.new(conn_opts, @plan_context, logger)
  end
  conn
end

#pack(directory) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/bolt/transport/orch.rb', line 124

def pack(directory)
  start_time = Time.now
  io = StringIO.new
  output = Minitar::Output.new(Zlib::GzipWriter.new(io))
  Find.find(directory) do |file|
    next unless File.file?(file)

    tar_path = Pathname.new(file).relative_path_from(Pathname.new(directory))
    @logger.debug("Packing #{file} to #{tar_path}")
    stat = File.stat(file)
    content = File.binread(file)
    output.tar.add_file_simple(
      tar_path.to_s,
      data: content,
      size: content.size,
      mode: stat.mode & 0o777,
      mtime: stat.mtime
    )
  end

  duration = Time.now - start_time
  @logger.debug("Packed upload in #{duration * 1000} ms")

  output.close
  io.string
ensure
  # Closes both tar and sgz.
  output&.close
end

#process_run_results(targets, results) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/bolt/transport/orch.rb', line 65

def process_run_results(targets, results)
  targets_by_name = Hash[targets.map(&:host).zip(targets)]
  results.map do |node_result|
    target = targets_by_name[node_result['name']]
    state = node_result['state']
    result = node_result['result']

    # If it's finished or already has a proper error simply pass it to the
    # the result otherwise make sure an error is generated
    if state == 'finished' || (result && result['_error'])
      Bolt::Result.new(target, value: result)
    elsif state == 'skipped'
      Bolt::Result.new(
        target,
        value: { '_error' => {
          'kind' => 'puppetlabs.tasks/skipped-node',
          'msg' => "Node #{target.host} was skipped",
          'details' => {}
        } }
      )
    else
      # Make a generic error with a unkown exit_code
      Bolt::Result.for_task(target, result.to_json, '', 'unknown')
    end
  end
end

#provided_featuresObject



32
33
34
# File 'lib/bolt/transport/orch.rb', line 32

def provided_features
  ['puppet-agent']
end

#run_task_job(targets, task, arguments, options) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/bolt/transport/orch.rb', line 187

def run_task_job(targets, task, arguments, options)
  targets.each do |target|
    yield(type: :node_start, target: target) if block_given?
  end

  begin
    # unpack any Sensitive data
    arguments = unwrap_sensitive_args(arguments)
    results = get_connection(targets.first.options).run_task(targets, task, arguments, options)

    process_run_results(targets, results)
  rescue OrchestratorClient::ApiError => e
    targets.map do |target|
      Bolt::Result.new(target, error: e.data)
    end
  rescue StandardError => e
    targets.map do |target|
      Bolt::Result.from_exception(target, e)
    end
  end
end

#unwrap_bolt_result(target, result) ⇒ Object

run_task generates a result that makes sense for a generic task which needs to be unwrapped to extract stdout/stderr/exitcode.



225
226
227
228
229
230
231
232
# File 'lib/bolt/transport/orch.rb', line 225

def unwrap_bolt_result(target, result)
  if result.error_hash
    # something went wrong return the failure
    return result
  end

  Bolt::Result.for_command(target, result.value['stdout'], result.value['stderr'], result.value['exit_code'])
end