Class: PlanExecutor::OrchClient
- Inherits:
-
Object
- Object
- PlanExecutor::OrchClient
- Defined in:
- lib/plan_executor/orch_client.rb
Constant Summary collapse
- BOLT_COMMAND_TASK =
Struct.new(:name).new('bolt_shim::command').freeze
- BOLT_SCRIPT_TASK =
Struct.new(:name).new('bolt_shim::script').freeze
- BOLT_UPLOAD_TASK =
Struct.new(:name).new('bolt_shim::upload').freeze
Instance Attribute Summary collapse
-
#http ⇒ Object
readonly
Returns the value of attribute http.
-
#plan_job ⇒ Object
readonly
Returns the value of attribute plan_job.
Instance Method Summary collapse
- #connected?(targets) ⇒ Boolean
- #file_upload(targets, source, destination, options = {}) ⇒ Object
- #finish_plan(plan_result) ⇒ Object
- #get(url) ⇒ Object
-
#initialize(plan_job, http_client, logger) ⇒ OrchClient
constructor
A new instance of OrchClient.
- #pack(directory) ⇒ Object
- #post_command(url, body) ⇒ Object
- #process_run_results(targets, results) ⇒ Object
- #run_command(targets, command, options = {}) ⇒ Object
- #run_script(targets, script, arguments, options = {}) ⇒ Object
- #run_task(targets, task, arguments, options = {}) ⇒ Object
- #run_task_job(targets, task, arguments, options) ⇒ Object
- #send_request(targets, task, arguments, options = {}) ⇒ Object
-
#unwrap_bolt_result(target, result) ⇒ Object
run_task generates a result that makes sense for a generic task which needs to be unwrapped to extract stdout/stderr/exitcode.
-
#unwrap_sensitive_args(arguments) ⇒ Object
Unwraps any Sensitive data in an arguments Hash, so the plain-text is passed to the Task/Script.
Constructor Details
#initialize(plan_job, http_client, logger) ⇒ OrchClient
Returns a new instance of OrchClient.
13 14 15 16 17 18 |
# File 'lib/plan_executor/orch_client.rb', line 13 def initialize(plan_job, http_client, logger) @plan_job = plan_job @logger = logger @http = http_client @environment = 'production' end |
Instance Attribute Details
#http ⇒ Object (readonly)
Returns the value of attribute http.
7 8 9 |
# File 'lib/plan_executor/orch_client.rb', line 7 def http @http end |
#plan_job ⇒ Object (readonly)
Returns the value of attribute plan_job.
7 8 9 |
# File 'lib/plan_executor/orch_client.rb', line 7 def plan_job @plan_job end |
Instance Method Details
#connected?(targets) ⇒ Boolean
232 233 234 235 |
# File 'lib/plan_executor/orch_client.rb', line 232 def connected?(targets) response = @http.post('inventory', nodes: targets.map(&:host)) response.body['items'].all? { |node| node['connected'] } end |
#file_upload(targets, source, destination, options = {}) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/plan_executor/orch_client.rb', line 168 def file_upload(targets, source, destination, = {}) stat = File.stat(source) content = if stat.directory? pack(source) else File.open(source, &:read) end content = Base64.encode64(content) mode = File.stat(source).mode params = { 'path' => destination, 'content' => content, 'mode' => mode, 'directory' => stat.directory? } results = run_task_job(targets, BOLT_UPLOAD_TASK, params, ) results.map! do |result| if result.error_hash result else Bolt::Result.for_upload(result.target, source, destination) end end end |
#finish_plan(plan_result) ⇒ Object
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/plan_executor/orch_client.rb', line 20 def finish_plan(plan_result) body = { plan_job: @plan_job, result: plan_result.value || '', status: plan_result.status } post_command('internal/plan_finish', body) rescue StandardError => e @logger.error("Failed to finish plan #{plan_job}: #{e.}") end |
#get(url) ⇒ Object
47 48 49 50 51 52 53 |
# File 'lib/plan_executor/orch_client.rb', line 47 def get(url) response = @http.get(url) if response.status != 200 raise Bolt::Error.new(response.body['msg'], response.body['kind'], response.body['details']) end response.body end |
#pack(directory) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/plan_executor/orch_client.rb', line 138 def pack(directory) start_time = Time.now io = StringIO.new output = Minitar::Output.new(Zlib::GzipWriter.new(io)) Find.find(directory) do |file| next unless File.file?(file) tar_path = Pathname.new(file).relative_path_from(Pathname.new(directory)) @logger.debug("Packing #{file} to #{tar_path}") stat = File.stat(file) content = File.binread(file) output.tar.add_file_simple( tar_path.to_s, data: content, size: content.size, mode: stat.mode & 0o777, mtime: stat.mtime ) end duration = Time.now - start_time @logger.debug("Packed upload in #{duration * 1000} ms") output.close io.string ensure # Closes both tar and sgz. output&.close end |
#post_command(url, body) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/plan_executor/orch_client.rb', line 55 def post_command(url, body) response = @http.post(url, body) if response.status != 202 raise Bolt::Error.new(response.body['msg'], response.body['kind'], response.body['details']) end response.body end |
#process_run_results(targets, results) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/plan_executor/orch_client.rb', line 86 def process_run_results(targets, results) targets_by_name = Hash[targets.map(&:host).zip(targets)] results.map do |node_result| target = targets_by_name[node_result['name']] state = node_result['state'] result = node_result['result'] # If it's finished or already has a proper error simply pass it to the # the result otherwise make sure an error is generated if state == 'finished' || (result && result['_error']) Bolt::Result.new(target, value: result) elsif state == 'skipped' Bolt::Result.new( target, value: { '_error' => { 'kind' => 'puppetlabs.tasks/skipped-node', 'msg' => "Node #{target.host} was skipped", 'details' => {} } } ) else # Make a generic error with a unkown exit_code Bolt::Result.for_task(target, result.to_json, '', 'unknown') end end end |
#run_command(targets, command, options = {}) ⇒ Object
117 118 119 120 |
# File 'lib/plan_executor/orch_client.rb', line 117 def run_command(targets, command, = {}) params = { 'command' => command } run_task_job(targets, BOLT_COMMAND_TASK, params, ) end |
#run_script(targets, script, arguments, options = {}) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/plan_executor/orch_client.rb', line 122 def run_script(targets, script, arguments, = {}) content = File.open(script, &:read) content = Base64.encode64(content) params = { 'content' => content, 'arguments' => arguments, 'name' => Pathname(script).basename.to_s } callback ||= proc {} results = run_task_job(targets, BOLT_SCRIPT_TASK, params, , &callback) results.map! { |result| unwrap_bolt_result(result.target, result) } results.each do |result| callback.call(type: :node_result, result: result) end end |
#run_task(targets, task, arguments, options = {}) ⇒ Object
113 114 115 |
# File 'lib/plan_executor/orch_client.rb', line 113 def run_task(targets, task, arguments, = {}) run_task_job(targets, task, arguments, ) end |
#run_task_job(targets, task, arguments, options) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/plan_executor/orch_client.rb', line 31 def run_task_job(targets, task, arguments, ) # unpack any Sensitive data arguments = unwrap_sensitive_args(arguments) results = send_request(targets, task, arguments, ) process_run_results(targets, results) rescue OrchestratorClient::ApiError => e targets.map do |target| Bolt::Result.new(target, error: e.data) end rescue StandardError => e targets.map do |target| Bolt::Result.from_exception(target, e) end end |
#send_request(targets, task, arguments, options = {}) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/plan_executor/orch_client.rb', line 63 def send_request(targets, task, arguments, = {}) description = ['_description'] body = { task: task.name, environment: @environment, noop: arguments['_noop'], params: arguments.reject { |k, _| k.start_with?('_') }, scope: { nodes: targets.map(&:host) } } body[:description] = description if description body[:plan_job] = @plan_job if @plan_job url = post_command('internal/plan_task', body).dig('job', 'id') job = get(url) until %w[stopped finished failed].include?(job['state']) sleep 1 job = get(url) end get(job.dig('nodes', 'id'))['items'] end |
#unwrap_bolt_result(target, result) ⇒ Object
run_task generates a result that makes sense for a generic task which needs to be unwrapped to extract stdout/stderr/exitcode.
223 224 225 226 227 228 229 230 |
# File 'lib/plan_executor/orch_client.rb', line 223 def unwrap_bolt_result(target, result) if result.error_hash # something went wrong return the failure return result end Bolt::Result.for_command(target, result.value['stdout'], result.value['stderr'], result.value['exit_code']) end |
#unwrap_sensitive_args(arguments) ⇒ Object
Unwraps any Sensitive data in an arguments Hash, so the plain-text is passed to the Task/Script.
This works on deeply nested data structures composed of Hashes, Arrays, and and plain-old data types (int, string, etc).
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/plan_executor/orch_client.rb', line 198 def unwrap_sensitive_args(arguments) # Skip this if Puppet isn't loaded return arguments unless defined?(Puppet::Pops::Types::PSensitiveType::Sensitive) case arguments when Array # iterate over the array, unwrapping all elements arguments.map { |x| unwrap_sensitive_args(x) } when Hash # iterate over the arguments hash and unwrap all keys and values arguments.each_with_object({}) { |(k, v), h| h[unwrap_sensitive_args(k)] = unwrap_sensitive_args(v) } when Puppet::Pops::Types::PSensitiveType::Sensitive # this value is Sensitive, unwrap it unwrap_sensitive_args(arguments.unwrap) else # unknown data type, just return it arguments end end |