Class: PoolParty::Master
Instance Attribute Summary
Attributes included from Scheduler
#tasker
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from FileWriter
base_tmp_dir, clear_base_directory, make_base_directory, remote_base_tmp_dir, with_temp_file, write_to_file_for, write_to_temp_file
Methods included from PoolParty
include_cloud_tasks, load_app, load_monitors, load_plugins, message, options, plugin_dir, read_config_file, register_monitor, registered_monitor?, registered_monitors, root_dir, timer, user_dir, verbose?, write_to_temp_file
Methods included from Remoter
included
Methods included from Callbacks
included
Methods inherited from Remoting
#can_shutdown_an_instance?, #can_start_a_new_instance?, #cloud_keypairs, #list_of_all_instances, #list_of_instances, #list_of_nonterminated_instances, #list_of_pending_instances, #list_of_running_instances, #list_of_terminating_instances, #maximum_number_of_instances_are_not_running?, #minimum_number_of_instances_are_running?, #number_of_all_pending_and_running_instances, #number_of_pending_and_running_instances, #number_of_pending_and_running_instances_for_list, #number_of_pending_instances, #number_of_running_instances, #request_launch_new_instance, #request_launch_new_instances, #request_launch_one_instance_at_a_time, #request_termination_of_all_instances, #request_termination_of_instance, #request_termination_of_running_instances, #running_instances, #update_instance_values
Methods included from Scheduler
#_tasker, #add_task, #daemonize, #interval, #run_thread_list, #run_thread_loop, #run_threads
Methods included from Ec2Wrapper
included
Constructor Details
#initialize ⇒ Master
Returns a new instance of Master.
13
14
15
16
17
18
|
# File 'lib/poolparty/master.rb', line 13
def initialize
super
self.class.send :rules, :contract_when, Application.options.contract_when unless are_rules?(:contract_when)
self.class.send :rules, :expand_when, Application.options.expand_when unless are_rules?(:expand_when)
end
|
Class Method Details
.build_haproxy_file ⇒ Object
461
462
463
464
465
466
|
# File 'lib/poolparty/master.rb', line 461
def build_haproxy_file
servers=<<-EOS
#{collect_nodes {|node| node.haproxy_entry}.join("\n")}
EOS
open(Application.haproxy_config_file).read.strip ^ {:servers => servers, :host_port => Application.host_port}
end
|
.build_user_global_files ⇒ Object
469
470
471
472
473
|
# File 'lib/poolparty/master.rb', line 469
def build_user_global_files
global_user_files.each do |arr|
write_to_file_for(arr[0]) &arr[1]
end
end
|
.build_user_node_files_for(node) ⇒ Object
474
475
476
477
478
479
480
|
# File 'lib/poolparty/master.rb', line 474
def build_user_node_files_for(node)
user_node_files.each do |arr|
write_to_file_for(arr[0], node) do
arr[1].call(node)
end
end
end
|
.cloud_ips ⇒ Object
427
428
429
|
# File 'lib/poolparty/master.rb', line 427
def cloud_ips
new.cloud_ips
end
|
.collect_nodes(&block) ⇒ Object
414
415
416
|
# File 'lib/poolparty/master.rb', line 414
def collect_nodes(&block)
new.nodes.collect &block
end
|
.define_global_user_file(name, &block) ⇒ Object
481
482
483
|
# File 'lib/poolparty/master.rb', line 481
def define_global_user_file(name, &block)
global_user_files << [name, block]
end
|
.define_node_user_file(name, &block) ⇒ Object
485
486
487
|
# File 'lib/poolparty/master.rb', line 485
def define_node_user_file(name, &block)
user_node_files << [name, block]
end
|
.get_master ⇒ Object
424
425
426
|
# File 'lib/poolparty/master.rb', line 424
def get_master
new.nodes[0]
end
|
.get_next_node(node) ⇒ Object
430
431
432
|
# File 'lib/poolparty/master.rb', line 430
def get_next_node(node)
new.get_next_node(node)
end
|
.global_user_files ⇒ Object
484
|
# File 'lib/poolparty/master.rb', line 484
def global_user_files;@global_user_files ||= [];end
|
.is_master_responding? ⇒ Boolean
421
422
423
|
# File 'lib/poolparty/master.rb', line 421
def is_master_responding?
`ping -c1 -t5 #{get_master.ip}`
end
|
.requires_heartbeat? ⇒ Boolean
418
419
420
|
# File 'lib/poolparty/master.rb', line 418
def requires_heartbeat?
new.nodes.size > 1
end
|
.set_hosts(c, remotetask = nil) ⇒ Object
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
|
# File 'lib/poolparty/master.rb', line 433
def set_hosts(c, remotetask=nil)
unless remotetask.nil?
rt = remotetask
end
ssh_location = `which ssh`.gsub(/\n/, '')
rsync_location = `which rsync`.gsub(/\n/, '')
rt.set :user, Application.username
rt.set :application, Application.app_name
rt.set :ssh_flags, "-i #{Application.keypair_path} -o StrictHostKeyChecking=no"
rt.set :rsync_flags , ['-azP', '--delete', "-e '#{ssh_location} -l #{Application.username} -i #{Application.keypair_path} -o StrictHostKeyChecking=no'"]
master = get_master
rt.set :domain, "#{master.ip}" if master
Master.with_nodes { |node|
rt.host "#{Application.username}@#{node.ip}",:app if node.status =~ /running/
}
end
|
453
454
455
456
457
458
459
460
|
# File 'lib/poolparty/master.rb', line 453
def ssh_configure_string_for(node)
cmd=<<-EOC
#{node.update_plugin_string(node)}
pool maintain -c ~/.config -l #{PoolParty.plugin_dir}
hostname -v #{node.name}
/usr/bin/s3fs #{Application.shared_bucket} -o accessKeyId=#{Application.access_key} -o secretAccessKey=#{Application.secret_access_key} -o nonempty /data
EOC
end
|
.user_node_files ⇒ Object
488
|
# File 'lib/poolparty/master.rb', line 488
def user_node_files;@user_node_files ||= [];end
|
.with_nodes(&block) ⇒ Object
410
411
412
|
# File 'lib/poolparty/master.rb', line 410
def with_nodes(&block)
new.nodes.each &block
end
|
Instance Method Details
#add_instance_if_load_is_high ⇒ Object
Also known as:
add_instance
Add an instance if the load is high
245
246
247
248
249
250
|
# File 'lib/poolparty/master.rb', line 245
def add_instance_if_load_is_high
if expand?
PoolParty.message "Cloud needs expansion"
grow_by(1)
end
end
|
#build_and_copy_heartbeat_authkeys_file ⇒ Object
Build the basic auth file for the heartbeat
294
295
296
297
298
|
# File 'lib/poolparty/master.rb', line 294
def build_and_copy_heartbeat_authkeys_file
write_to_file_for("authkeys") do
open(Application.heartbeat_authkeys_config_file).read
end
end
|
#build_and_send_config_files_in_temp_directory ⇒ Object
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
# File 'lib/poolparty/master.rb', line 178
def build_and_send_config_files_in_temp_directory
require 'ftools'
if File.directory?(Application.plugin_dir)
Kernel.system("tar -czf #{base_tmp_dir}/plugins.tar.gz #{File.basename(Application.plugin_dir)}")
end
if Master.requires_heartbeat?
build_and_copy_heartbeat_authkeys_file
File.copy(get_config_file_for("cloud_master_takeover"), "#{base_tmp_dir}/cloud_master_takeover")
File.copy(get_config_file_for("heartbeat.conf"), "#{base_tmp_dir}/ha.cf")
end
File.copy(Application.config_file, "#{base_tmp_dir}/config.yml") if Application.config_file && File.exists?(Application.config_file)
File.copy(Application.keypair_path, "#{base_tmp_dir}/keypair") if File.exists?(Application.keypair_path)
copy_pem_files_to_tmp_dir
copy_config_files_in_directory_to_tmp_dir("config/resource.d")
build_haproxy_file
Master.build_user_global_files
build_nodes_list
Master.with_nodes do |node|
build_hosts_file_for(node)
build_reconfigure_instances_script_for(node)
Master.build_user_node_files_for(node)
if Master.requires_heartbeat?
build_heartbeat_config_file_for(node)
build_heartbeat_resources_file_for(node)
end
end
end
|
#build_haproxy_file ⇒ Object
Build the basic haproxy config file from the config file in the config directory and return a tempfile
274
275
276
277
278
279
280
281
|
# File 'lib/poolparty/master.rb', line 274
def build_haproxy_file
write_to_file_for("haproxy") do
servers=<<-EOS
#{nodes.collect {|node| node.haproxy_entry}.join("\n")}
EOS
open(Application.haproxy_config_file).read.strip ^ {:servers => servers, :host_port => Application.host_port}
end
end
|
#build_heartbeat_config_file_for(node) ⇒ Object
Build heartbeat config file
300
301
302
303
304
305
|
# File 'lib/poolparty/master.rb', line 300
def build_heartbeat_config_file_for(node)
write_to_file_for("heartbeat", node) do
servers = "#{node.node_entry}\n#{get_next_node(node).node_entry}" rescue ""
open(Application.heartbeat_config_file).read.strip ^ {:nodes => servers}
end
end
|
#build_heartbeat_resources_file_for(node) ⇒ Object
306
307
308
309
310
|
# File 'lib/poolparty/master.rb', line 306
def build_heartbeat_resources_file_for(node)
write_to_file_for("haresources", node) do
"#{node.haproxy_resources_entry}\n#{get_next_node(node).haproxy_resources_entry}" rescue ""
end
end
|
#build_hosts_file_for(n) ⇒ Object
Build host file for a specific node
283
284
285
286
287
|
# File 'lib/poolparty/master.rb', line 283
def build_hosts_file_for(n)
write_to_file_for("hosts", n) do
"#{nodes.collect {|node| node.ip == n.ip ? node.local_hosts_entry : node.hosts_entry}.join("\n")}"
end
end
|
#build_nodes_list ⇒ Object
288
289
290
291
292
|
# File 'lib/poolparty/master.rb', line 288
def build_nodes_list
write_to_file_for(RemoteInstance.node_list_name) do
"#{cloud_ips.join("\n")}"
end
end
|
Build basic configuration script for the node
312
313
314
315
316
|
# File 'lib/poolparty/master.rb', line 312
def build_reconfigure_instances_script_for(node)
write_to_file_for("configuration", node) do
open(Application.sh_reconfigure_instances_script).read.strip ^ node.configure_tasks( !PoolParty.verbose? )
end
end
|
#check_stats ⇒ Object
Sole purpose to check the stats, mainly in a plugin
134
135
136
137
|
# File 'lib/poolparty/master.rb', line 134
def check_stats
str = registered_monitors.collect {|m| "#{m}"}
PoolParty.message "Monitors: #{str.join(", ")}"
end
|
#cleanup_tmp_directory(c) ⇒ Object
223
224
225
|
# File 'lib/poolparty/master.rb', line 223
def cleanup_tmp_directory(c)
Dir["#{base_tmp_dir}/*"].each {|f| FileUtils.rm_rf f} if File.directory?("tmp/")
end
|
#cloud_ips ⇒ Object
103
104
105
|
# File 'lib/poolparty/master.rb', line 103
def cloud_ips
@ips ||= nodes.collect {|a| a.ip }
end
|
#cloud_nodes ⇒ Object
Return a list of the nodes for each keypair and cache them
348
349
350
351
352
353
354
355
356
357
358
|
# File 'lib/poolparty/master.rb', line 348
def cloud_nodes
@cloud_nodes ||= begin
nodes_list = []
cloud_keypairs.each {|keypair|
list_of_nonterminated_instances(list_of_instances(keypair)).collect_with_index { |inst, i|
nodes_list << RemoteInstance.new(inst.merge({:number => i}))
}
}
nodes_list
end
end
|
#clouds_list ⇒ Object
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
|
# File 'lib/poolparty/master.rb', line 382
def clouds_list
if number_of_all_pending_and_running_instances > 0
out = "-- ALL CLOUDS (#{number_of_all_pending_and_running_instances})--\n"
keypair = nil
out << cloud_nodes.collect {|node|
str = ""
if keypair != node.keypair
keypair = node.keypair;
str = "key pair: #{keypair} (#{number_of_pending_and_running_instances(keypair)})\n"
end
str += "\t"+node.description if !node.description.nil?
}.join("\n")
else
out = "Clouds are not running"
end
out
end
|
Configure the master because the master will take care of the rest after that
65
66
67
68
69
70
71
72
73
|
# File 'lib/poolparty/master.rb', line 65
def configure_cloud
message "Configuring master"
build_and_send_config_files_in_temp_directory
remote_configure_instances
nodes.each do |node|
node.configure
end
end
|
#contract? ⇒ Boolean
261
262
263
|
# File 'lib/poolparty/master.rb', line 261
def contract?
valid_rules?(:contract_when)
end
|
#copy_config_files_in_directory_to_tmp_dir(dir) ⇒ Object
Copy all the files in the directory to the dest
327
328
329
330
331
332
333
334
335
336
337
338
339
340
|
# File 'lib/poolparty/master.rb', line 327
def copy_config_files_in_directory_to_tmp_dir(dir)
dest_dir = "#{base_tmp_dir}/#{File.basename(dir)}"
FileUtils.mkdir_p dest_dir
if File.directory?("#{user_dir}/#{dir}")
Dir["#{user_dir}/#{dir}/*"].each do |file|
File.copy(file, dest_dir)
end
else
Dir["#{root_dir}/#{dir}/*"].each do |file|
File.copy(file, dest_dir)
end
end
end
|
#copy_pem_files_to_tmp_dir ⇒ Object
214
215
216
217
218
219
220
221
222
|
# File 'lib/poolparty/master.rb', line 214
def copy_pem_files_to_tmp_dir
%w(EC2_CERT EC2_PRIVATE_KEY).each do |key|
begin
file = `echo $#{key}`.strip
File.copy(file, "#{base_tmp_dir}/#{File.basename(file)}")
rescue Exception => e
end
end
end
|
#expand? ⇒ Boolean
264
265
266
|
# File 'lib/poolparty/master.rb', line 264
def expand?
valid_rules?(:expand_when)
end
|
#get_config_file_for(name) ⇒ Object
Try the user’s directory before the master directory
319
320
321
322
323
324
325
|
# File 'lib/poolparty/master.rb', line 319
def get_config_file_for(name)
if File.exists?("#{user_dir}/config/#{name}")
"#{user_dir}/config/#{name}"
else
"#{root_dir}/config/#{name}"
end
end
|
#get_next_node(node) ⇒ Object
Get the next node in sequence, so we can configure heartbeat to monitor the next node
364
365
366
367
368
|
# File 'lib/poolparty/master.rb', line 364
def get_next_node(node)
i = node.number + 1
i = 0 if i >= nodes.size
get_node(i)
end
|
#get_node(i = 0) ⇒ Object
Get the node at the specific index from the cached nodes
360
361
362
|
# File 'lib/poolparty/master.rb', line 360
def get_node(i=0)
nodes.select {|a| a.number == i.to_i}.first
end
|
#grow_by(num = 1) ⇒ Object
156
157
158
159
160
161
162
163
|
# File 'lib/poolparty/master.rb', line 156
def grow_by(num=1)
request_launch_new_instances(num)
wait_for_all_instances_to_boot
reset!
configure_cloud
end
|
#install_cloud(bool = false) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/poolparty/master.rb', line 82
def install_cloud(bool=false)
if Application.install_on_load? || bool
update_apt_string =<<-EOE
touch /etc/apt/sources.list
echo 'deb http://mirrors.kernel.org/ubuntu hardy main universe' >> /etc/apt/sources.list
apt-get update --fix-missing
EOE
ssh(update_apt_string)
Provider.install_poolparty
nodes.each do |node|
node.install
end
end
end
|
#launch_minimum_instances ⇒ Object
Launch the minimum number of instances.
107
108
109
110
|
# File 'lib/poolparty/master.rb', line 107
def launch_minimum_instances
request_launch_new_instances(Application.minimum_instances - number_of_pending_and_running_instances)
nodes
end
|
373
374
375
376
377
378
379
380
381
|
# File 'lib/poolparty/master.rb', line 373
def list
if number_of_pending_and_running_instances > 0
out = "-- CLOUD (#{number_of_pending_and_running_instances})--\n"
out << nodes.collect {|node| node.description }.join("\n")
else
out = "Cloud is not running"
end
out
end
|
#make_base_tmp_dir(c) ⇒ Object
174
175
176
|
# File 'lib/poolparty/master.rb', line 174
def make_base_tmp_dir(c)
`mkdir #{base_tmp_dir}` unless File.directory?(base_tmp_dir)
end
|
Return a list of the nodes and cache them
342
343
344
345
346
|
# File 'lib/poolparty/master.rb', line 342
def nodes
@nodes ||= list_of_nonterminated_instances.collect_with_index do |inst, i|
RemoteInstance.new(inst.merge({:number => i}))
end
end
|
152
153
154
155
|
# File 'lib/poolparty/master.rb', line 152
def number_of_unconfigured_nodes
nodes.reject {|a| a.stack_installed? }.size
end
|
370
371
|
# File 'lib/poolparty/master.rb', line 370
def on_exit
end
|
Tough method: We need to make sure that all the instances have the required software installed This is a basic check against the local store of the instances that have the stack installed.
148
149
150
151
|
# File 'lib/poolparty/master.rb', line 148
def reconfigure_cloud_when_necessary
PoolParty.message "#{number_of_unconfigured_nodes} unconfigured nodes"
configure_cloud if number_of_unconfigured_nodes > 0
end
|
232
233
234
235
236
237
238
239
240
241
242
243
|
# File 'lib/poolparty/master.rb', line 232
def remote_configure_instances
arr = []
Master.with_nodes do |node|
script_file = "#{remote_base_tmp_dir}/#{node.name}-configuration"
str=<<-EOC
chmod +x #{script_file}
/bin/sh #{script_file}
EOC
arr << "#{self.class.ssh_string} #{node.ip} '#{str.strip.runnable}'"
end
run_array_of_tasks(arr)
end
|
#remove_ssh_key(i) ⇒ Object
79
80
81
|
# File 'lib/poolparty/master.rb', line 79
def remove_ssh_key(i)
Kernel.system("ssh-add -d #{Application.keypair_name} >/dev/null 2>/dev/null")
end
|
Reset and clear the caches
400
401
402
403
404
|
# File 'lib/poolparty/master.rb', line 400
def reset!
@cached_descriptions = nil
@nodes = nil
@cloud_nodes = nil
end
|
#restart_running_instances_services ⇒ Object
Restart the running instances services with monit on all the nodes
268
269
270
271
272
|
# File 'lib/poolparty/master.rb', line 268
def restart_running_instances_services
nodes.each do |node|
node.restart_with_monit
end
end
|
#scale_cloud! ⇒ Object
Also known as:
scale_cloud
Add an instance if the cloud needs one ore terminate one if necessary
139
140
141
142
|
# File 'lib/poolparty/master.rb', line 139
def scale_cloud!
add_instance_if_load_is_high
terminate_instance_if_load_is_low
end
|
#send_config_files_to_nodes(c) ⇒ Object
Send the files to the nodes
228
229
230
|
# File 'lib/poolparty/master.rb', line 228
def send_config_files_to_nodes(c)
run_array_of_tasks(rsync_tasks("#{base_tmp_dir}/*", "#{remote_base_tmp_dir}"))
end
|
#setup_cloud ⇒ Object
34
35
36
37
|
# File 'lib/poolparty/master.rb', line 34
def setup_cloud
install_cloud
configure_cloud
end
|
#shrink_by(num = 1) ⇒ Object
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/poolparty/master.rb', line 164
def shrink_by(num=1)
num.times do |i|
node = nodes.reject {|a| a.master? }[-1]
res = request_termination_of_instance(node.instance_id) if node
PoolParty.message "#{res ? "Could" : "Could not"} shutdown instance"
end
wait_for_all_instances_to_terminate
configure_cloud
end
|
#start! ⇒ Object
Also known as:
start
Start the cloud, which launches the minimum_instances
25
26
27
28
29
30
31
32
33
|
# File 'lib/poolparty/master.rb', line 25
def start!
message "Launching minimum_instances"
launch_minimum_instances
message "Waiting for master to boot up"
wait_for_all_instances_to_boot
setup_cloud
end
|
#start_cloud! ⇒ Object
Also known as:
start_cloud
20
21
22
|
# File 'lib/poolparty/master.rb', line 20
def start_cloud!
start!
end
|
#start_monitor! ⇒ Object
Also known as:
start_monitor
Start monitoring the cloud with the threaded loop
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/poolparty/master.rb', line 112
def start_monitor!
begin
trap("INT") do
on_exit
exit
end
run_thread_loop(:daemonize => !Application.test?) do
add_task {PoolParty.message "Checking cloud"}
add_task {launch_minimum_instances}
add_task {reconfigure_cloud_when_necessary}
add_task {scale_cloud!}
add_task {check_stats}
end
rescue Exception => e
Process.kill("HUP", Process.pid)
end
end
|
#terminate_instance_if_load_is_low ⇒ Object
Also known as:
terminate_instance
Teardown an instance if the load is pretty low
253
254
255
256
257
258
|
# File 'lib/poolparty/master.rb', line 253
def terminate_instance_if_load_is_low
if contract?
PoolParty.message "Cloud to shrink"
shrink_by(1)
end
end
|
#user_tasks ⇒ Object
131
132
|
# File 'lib/poolparty/master.rb', line 131
def user_tasks
end
|
#wait_for_all_instances_to_boot ⇒ Object
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/poolparty/master.rb', line 39
def wait_for_all_instances_to_boot
reset!
while !number_of_pending_instances.zero?
wait "2.seconds" unless Application.test?
waited = true
reset!
end
unless Application.test? || waited.nil?
message "Give some time for the instance ssh to start up"
wait "15.seconds"
end
end
|
#wait_for_all_instances_to_terminate ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/poolparty/master.rb', line 51
def wait_for_all_instances_to_terminate
reset!
while !list_of_terminating_instances.size.zero?
wait "2.seconds" unless Application.test?
waited = true
reset!
end
unless Application.test? || waited.nil?
message "Give some time for the instance ssh to start up"
wait "15.seconds"
end
reset!
end
|