Class: Bolt::Transport::Orch
- Inherits:
-
Base
- Object
- Base
- Bolt::Transport::Orch
show all
- Defined in:
- lib/bolt/transport/orch.rb,
lib/bolt/transport/orch/connection.rb
Defined Under Namespace
Classes: Connection
Constant Summary
collapse
- CONF_FILE =
if !ENV['HOME'].nil?
File.expand_path('~/.puppetlabs/client-tools/orchestrator.conf')
else
'/etc/puppetlabs/client-tools/orchestrator.conf'
end
- BOLT_COMMAND_TASK =
Struct.new(:name).new('bolt_shim::command').freeze
- BOLT_SCRIPT_TASK =
Struct.new(:name).new('bolt_shim::script').freeze
- BOLT_UPLOAD_TASK =
Struct.new(:name).new('bolt_shim::upload').freeze
Constants inherited
from Base
Base::ENVIRONMENT_METHODS, Base::STDIN_METHODS
Instance Attribute Summary collapse
Attributes inherited from Base
#logger
Class Method Summary
collapse
Instance Method Summary
collapse
-
#batch_command(targets, command, options = {}, &callback) ⇒ Object
-
#batch_connected?(targets) ⇒ Boolean
-
#batch_script(targets, script, arguments, options = {}, &callback) ⇒ Object
-
#batch_task(targets, task, arguments, options = {}, &callback) ⇒ Object
-
#batch_upload(targets, source, destination, options = {}, &callback) ⇒ Object
-
#batches(targets) ⇒ Object
-
#finish_plan(result) ⇒ Object
-
#get_connection(conn_opts) ⇒ Object
It’s safe to create connections here for now because the batches/threads are per connection.
-
#initialize(*args) ⇒ Orch
constructor
-
#pack(directory) ⇒ Object
-
#process_run_results(targets, results) ⇒ Object
-
#provided_features ⇒ Object
-
#run_task_job(targets, task, arguments, options) ⇒ Object
-
#unwrap_bolt_result(target, result) ⇒ Object
run_task generates a result that makes sense for a generic task which needs to be unwrapped to extract stdout/stderr/exitcode.
Methods inherited from Base
#assert_batch_size_one, #connected?, #default_input_method, #envify_params, filter_options, #run_command, #run_script, #run_task, #select_implementation, #unwrap_sensitive_args, #upload, #with_events
Constructor Details
#initialize(*args) ⇒ Orch
Returns a new instance of Orch.
38
39
40
41
|
# File 'lib/bolt/transport/orch.rb', line 38
def initialize(*args)
@connections = {}
super
end
|
Instance Attribute Details
#plan_context=(value) ⇒ Object
Sets the attribute plan_context
26
27
28
|
# File 'lib/bolt/transport/orch.rb', line 26
def plan_context=(value)
@plan_context = value
end
|
Class Method Details
.options ⇒ Object
28
29
30
|
# File 'lib/bolt/transport/orch.rb', line 28
def self.options
%w[service-url cacert token-file task-environment]
end
|
.validate(options) ⇒ Object
36
|
# File 'lib/bolt/transport/orch.rb', line 36
def self.validate(options); end
|
Instance Method Details
#batch_command(targets, command, options = {}, &callback) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/bolt/transport/orch.rb', line 92
def batch_command(targets, command, options = {}, &callback)
params = {
'command' => command
}
results = run_task_job(targets,
BOLT_COMMAND_TASK,
params,
options,
&callback)
callback ||= proc {}
results.map! { |result| unwrap_bolt_result(result.target, result) }
results.each do |result|
callback.call(type: :node_result, result: result)
end
end
|
#batch_connected?(targets) ⇒ Boolean
217
218
219
220
|
# File 'lib/bolt/transport/orch.rb', line 217
def batch_connected?(targets)
resp = get_connection(targets.first.options).query_inventory(targets)
resp['items'].all? { |node| node['connected'] }
end
|
#batch_script(targets, script, arguments, options = {}, &callback) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/bolt/transport/orch.rb', line 108
def batch_script(targets, script, arguments, options = {}, &callback)
content = File.open(script, &:read)
content = Base64.encode64(content)
params = {
'content' => content,
'arguments' => arguments,
'name' => Pathname(script).basename.to_s
}
callback ||= proc {}
results = run_task_job(targets, BOLT_SCRIPT_TASK, params, options, &callback)
results.map! { |result| unwrap_bolt_result(result.target, result) }
results.each do |result|
callback.call(type: :node_result, result: result)
end
end
|
#batch_task(targets, task, arguments, options = {}, &callback) ⇒ Object
209
210
211
212
213
214
215
|
# File 'lib/bolt/transport/orch.rb', line 209
def batch_task(targets, task, arguments, options = {}, &callback)
callback ||= proc {}
results = run_task_job(targets, task, arguments, options, &callback)
results.each do |result|
callback.call(type: :node_result, result: result)
end
end
|
#batch_upload(targets, source, destination, options = {}, &callback) ⇒ Object
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
# File 'lib/bolt/transport/orch.rb', line 154
def batch_upload(targets, source, destination, options = {}, &callback)
stat = File.stat(source)
content = if stat.directory?
pack(source)
else
File.open(source, &:read)
end
content = Base64.encode64(content)
mode = File.stat(source).mode
params = {
'path' => destination,
'content' => content,
'mode' => mode,
'directory' => stat.directory?
}
callback ||= proc {}
results = run_task_job(targets, BOLT_UPLOAD_TASK, params, options, &callback)
results.map! do |result|
if result.error_hash
result
else
Bolt::Result.for_upload(result.target, source, destination)
end
end
results.each do |result|
callback&.call(type: :node_result, result: result)
end
end
|
#batches(targets) ⇒ Object
183
184
185
|
# File 'lib/bolt/transport/orch.rb', line 183
def batches(targets)
targets.group_by { |target| Connection.get_key(target.options) }.values
end
|
#finish_plan(result) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/bolt/transport/orch.rb', line 43
def finish_plan(result)
if result.is_a? Bolt::PlanResult
@connections.each_value do |conn|
begin
conn.finish_plan(result)
rescue StandardError => e
@logger.debug("Failed to finish plan on #{conn.key}: #{e.message}")
end
end
end
end
|
#get_connection(conn_opts) ⇒ Object
It’s safe to create connections here for now because the batches/threads are per connection.
57
58
59
60
61
62
63
|
# File 'lib/bolt/transport/orch.rb', line 57
def get_connection(conn_opts)
key = Connection.get_key(conn_opts)
unless (conn = @connections[key])
conn = @connections[key] = Connection.new(conn_opts, @plan_context, logger)
end
conn
end
|
#pack(directory) ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# File 'lib/bolt/transport/orch.rb', line 124
def pack(directory)
start_time = Time.now
io = StringIO.new
output = Minitar::Output.new(Zlib::GzipWriter.new(io))
Find.find(directory) do |file|
next unless File.file?(file)
tar_path = Pathname.new(file).relative_path_from(Pathname.new(directory))
@logger.debug("Packing #{file} to #{tar_path}")
stat = File.stat(file)
content = File.binread(file)
output.tar.add_file_simple(
tar_path.to_s,
data: content,
size: content.size,
mode: stat.mode & 0o777,
mtime: stat.mtime
)
end
duration = Time.now - start_time
@logger.debug("Packed upload in #{duration * 1000} ms")
output.close
io.string
ensure
output&.close
end
|
#process_run_results(targets, results) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/bolt/transport/orch.rb', line 65
def process_run_results(targets, results)
targets_by_name = Hash[targets.map(&:host).zip(targets)]
results.map do |node_result|
target = targets_by_name[node_result['name']]
state = node_result['state']
result = node_result['result']
if state == 'finished' || (result && result['_error'])
Bolt::Result.new(target, value: result)
elsif state == 'skipped'
Bolt::Result.new(
target,
value: { '_error' => {
'kind' => 'puppetlabs.tasks/skipped-node',
'msg' => "Node #{target.host} was skipped",
'details' => {}
} }
)
else
Bolt::Result.for_task(target, result.to_json, '', 'unknown')
end
end
end
|
#provided_features ⇒ Object
32
33
34
|
# File 'lib/bolt/transport/orch.rb', line 32
def provided_features
['puppet-agent']
end
|
#run_task_job(targets, task, arguments, options) ⇒ Object
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
# File 'lib/bolt/transport/orch.rb', line 187
def run_task_job(targets, task, arguments, options)
targets.each do |target|
yield(type: :node_start, target: target) if block_given?
end
begin
arguments = unwrap_sensitive_args(arguments)
results = get_connection(targets.first.options).run_task(targets, task, arguments, options)
process_run_results(targets, results)
rescue OrchestratorClient::ApiError => e
targets.map do |target|
Bolt::Result.new(target, error: e.data)
end
rescue StandardError => e
targets.map do |target|
Bolt::Result.from_exception(target, e)
end
end
end
|
#unwrap_bolt_result(target, result) ⇒ Object
run_task generates a result that makes sense for a generic task which needs to be unwrapped to extract stdout/stderr/exitcode.
225
226
227
228
229
230
231
232
|
# File 'lib/bolt/transport/orch.rb', line 225
def unwrap_bolt_result(target, result)
if result.error_hash
return result
end
Bolt::Result.for_command(target, result.value['stdout'], result.value['stderr'], result.value['exit_code'])
end
|