Class: PlanExecutor::OrchClient

Inherits:
Object
  • Object
show all
Defined in:
lib/plan_executor/orch_client.rb

Constant Summary collapse

BOLT_COMMAND_TASK =
Struct.new(:name).new('bolt_shim::command').freeze
BOLT_SCRIPT_TASK =
Struct.new(:name).new('bolt_shim::script').freeze
BOLT_UPLOAD_TASK =
Struct.new(:name).new('bolt_shim::upload').freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plan_job, http_client, logger) ⇒ OrchClient

Returns a new instance of OrchClient.



13
14
15
16
17
18
# File 'lib/plan_executor/orch_client.rb', line 13

def initialize(plan_job, http_client, logger)
  @plan_job = plan_job
  @logger = logger
  @http = http_client
  @environment = 'production'
end

Instance Attribute Details

#httpObject (readonly)

Returns the value of attribute http.



7
8
9
# File 'lib/plan_executor/orch_client.rb', line 7

def http
  @http
end

#plan_jobObject (readonly)

Returns the value of attribute plan_job.



7
8
9
# File 'lib/plan_executor/orch_client.rb', line 7

def plan_job
  @plan_job
end

Instance Method Details

#connected?(targets) ⇒ Boolean

Returns:

  • (Boolean)


232
233
234
235
# File 'lib/plan_executor/orch_client.rb', line 232

def connected?(targets)
  response = @http.post('inventory', nodes: targets.map(&:host))
  response.body['items'].all? { |node| node['connected'] }
end

#file_upload(targets, source, destination, options = {}) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/plan_executor/orch_client.rb', line 168

def file_upload(targets, source, destination, options = {})
  stat = File.stat(source)
  content = if stat.directory?
              pack(source)
            else
              File.open(source, &:read)
            end
  content = Base64.encode64(content)
  mode = File.stat(source).mode
  params = {
    'path' => destination,
    'content' => content,
    'mode' => mode,
    'directory' => stat.directory?
  }
  results = run_task_job(targets, BOLT_UPLOAD_TASK, params, options)
  results.map! do |result|
    if result.error_hash
      result
    else
      Bolt::Result.for_upload(result.target, source, destination)
    end
  end
end

#finish_plan(plan_result) ⇒ Object



20
21
22
23
24
25
26
27
28
29
# File 'lib/plan_executor/orch_client.rb', line 20

def finish_plan(plan_result)
  body = {
    plan_job: @plan_job,
    result: plan_result.value || '',
    status: plan_result.status
  }
  post_command('internal/plan_finish', body)
rescue StandardError => e
  @logger.error("Failed to finish plan #{plan_job}: #{e.message}")
end

#get(url) ⇒ Object



47
48
49
50
51
52
53
# File 'lib/plan_executor/orch_client.rb', line 47

def get(url)
  response = @http.get(url)
  if response.status != 200
    raise Bolt::Error.new(response.body['msg'], response.body['kind'], response.body['details'])
  end
  response.body
end

#pack(directory) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/plan_executor/orch_client.rb', line 138

def pack(directory)
  start_time = Time.now
  io = StringIO.new
  output = Minitar::Output.new(Zlib::GzipWriter.new(io))
  Find.find(directory) do |file|
    next unless File.file?(file)

    tar_path = Pathname.new(file).relative_path_from(Pathname.new(directory))
    @logger.debug("Packing #{file} to #{tar_path}")
    stat = File.stat(file)
    content = File.binread(file)
    output.tar.add_file_simple(
      tar_path.to_s,
      data: content,
      size: content.size,
      mode: stat.mode & 0o777,
      mtime: stat.mtime
    )
  end

  duration = Time.now - start_time
  @logger.debug("Packed upload in #{duration * 1000} ms")

  output.close
  io.string
ensure
  # Closes both tar and sgz.
  output&.close
end

#post_command(url, body) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/plan_executor/orch_client.rb', line 55

def post_command(url, body)
  response = @http.post(url, body)
  if response.status != 202
    raise Bolt::Error.new(response.body['msg'], response.body['kind'], response.body['details'])
  end
  response.body
end

#process_run_results(targets, results) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/plan_executor/orch_client.rb', line 86

def process_run_results(targets, results)
  targets_by_name = Hash[targets.map(&:host).zip(targets)]
  results.map do |node_result|
    target = targets_by_name[node_result['name']]
    state = node_result['state']
    result = node_result['result']

    # If it's finished or already has a proper error simply pass it to the
    # the result otherwise make sure an error is generated
    if state == 'finished' || (result && result['_error'])
      Bolt::Result.new(target, value: result)
    elsif state == 'skipped'
      Bolt::Result.new(
        target,
        value: { '_error' => {
          'kind' => 'puppetlabs.tasks/skipped-node',
          'msg' => "Node #{target.host} was skipped",
          'details' => {}
        } }
      )
    else
      # Make a generic error with a unkown exit_code
      Bolt::Result.for_task(target, result.to_json, '', 'unknown')
    end
  end
end

#run_command(targets, command, options = {}) ⇒ Object



117
118
119
120
# File 'lib/plan_executor/orch_client.rb', line 117

def run_command(targets, command, options = {})
  params = { 'command' => command }
  run_task_job(targets, BOLT_COMMAND_TASK, params, options)
end

#run_script(targets, script, arguments, options = {}) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/plan_executor/orch_client.rb', line 122

def run_script(targets, script, arguments, options = {})
  content = File.open(script, &:read)
  content = Base64.encode64(content)
  params = {
    'content' => content,
    'arguments' => arguments,
    'name' => Pathname(script).basename.to_s
  }
  callback ||= proc {}
  results = run_task_job(targets, BOLT_SCRIPT_TASK, params, options, &callback)
  results.map! { |result| unwrap_bolt_result(result.target, result) }
  results.each do |result|
    callback.call(type: :node_result, result: result)
  end
end

#run_task(targets, task, arguments, options = {}) ⇒ Object



113
114
115
# File 'lib/plan_executor/orch_client.rb', line 113

def run_task(targets, task, arguments, options = {})
  run_task_job(targets, task, arguments, options)
end

#run_task_job(targets, task, arguments, options) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/plan_executor/orch_client.rb', line 31

def run_task_job(targets, task, arguments, options)
  # unpack any Sensitive data
  arguments = unwrap_sensitive_args(arguments)
  results = send_request(targets, task, arguments, options)

  process_run_results(targets, results)
rescue OrchestratorClient::ApiError => e
  targets.map do |target|
    Bolt::Result.new(target, error: e.data)
  end
rescue StandardError => e
  targets.map do |target|
    Bolt::Result.from_exception(target, e)
  end
end

#send_request(targets, task, arguments, options = {}) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/plan_executor/orch_client.rb', line 63

def send_request(targets, task, arguments, options = {})
  description = options['_description']
  body = { task: task.name,
           environment: @environment,
           noop: arguments['_noop'],
           params: arguments.reject { |k, _| k.start_with?('_') },
           scope: {
             nodes: targets.map(&:host)
           } }
  body[:description] = description if description
  body[:plan_job] = @plan_job if @plan_job

  url = post_command('internal/plan_task', body).dig('job', 'id')

  job = get(url)
  until %w[stopped finished failed].include?(job['state'])
    sleep 1
    job = get(url)
  end

  get(job.dig('nodes', 'id'))['items']
end

#unwrap_bolt_result(target, result) ⇒ Object

run_task generates a result that makes sense for a generic task which needs to be unwrapped to extract stdout/stderr/exitcode.



223
224
225
226
227
228
229
230
# File 'lib/plan_executor/orch_client.rb', line 223

def unwrap_bolt_result(target, result)
  if result.error_hash
    # something went wrong return the failure
    return result
  end

  Bolt::Result.for_command(target, result.value['stdout'], result.value['stderr'], result.value['exit_code'])
end

#unwrap_sensitive_args(arguments) ⇒ Object

Unwraps any Sensitive data in an arguments Hash, so the plain-text is passed to the Task/Script.

This works on deeply nested data structures composed of Hashes, Arrays, and and plain-old data types (int, string, etc).



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/plan_executor/orch_client.rb', line 198

def unwrap_sensitive_args(arguments)
  # Skip this if Puppet isn't loaded
  return arguments unless defined?(Puppet::Pops::Types::PSensitiveType::Sensitive)

  case arguments
  when Array
    # iterate over the array, unwrapping all elements
    arguments.map { |x| unwrap_sensitive_args(x) }
  when Hash
    # iterate over the arguments hash and unwrap all keys and values
    arguments.each_with_object({}) { |(k, v), h|
      h[unwrap_sensitive_args(k)] = unwrap_sensitive_args(v)
    }
  when Puppet::Pops::Types::PSensitiveType::Sensitive
    # this value is Sensitive, unwrap it
    unwrap_sensitive_args(arguments.unwrap)
  else
    # unknown data type, just return it
    arguments
  end
end