Class: PoolParty::Master

Inherits:
Remoting show all
Extended by:
PoolParty, FileWriter
Includes:
Aska, Callbacks, FileWriter, Remoter
Defined in:
lib/poolparty/master.rb

Instance Attribute Summary

Attributes included from Scheduler

#tasker

Class Method Summary collapse

Instance Method Summary collapse

Methods included from FileWriter

base_tmp_dir, clear_base_directory, make_base_directory, remote_base_tmp_dir, with_temp_file, write_to_file_for, write_to_temp_file

Methods included from PoolParty

include_cloud_tasks, load_app, load_monitors, load_plugins, message, options, plugin_dir, read_config_file, register_monitor, registered_monitor?, registered_monitors, root_dir, timer, user_dir, verbose?, write_to_temp_file

Methods included from Remoter

included

Methods included from Callbacks

included

Methods inherited from Remoting

#can_shutdown_an_instance?, #can_start_a_new_instance?, #cloud_keypairs, #list_of_all_instances, #list_of_instances, #list_of_nonterminated_instances, #list_of_pending_instances, #list_of_running_instances, #list_of_terminating_instances, #maximum_number_of_instances_are_not_running?, #minimum_number_of_instances_are_running?, #number_of_all_pending_and_running_instances, #number_of_pending_and_running_instances, #number_of_pending_and_running_instances_for_list, #number_of_pending_instances, #number_of_running_instances, #request_launch_new_instance, #request_launch_new_instances, #request_launch_one_instance_at_a_time, #request_termination_of_all_instances, #request_termination_of_instance, #request_termination_of_running_instances, #running_instances, #update_instance_values

Methods included from Scheduler

#_tasker, #add_task, #daemonize, #interval, #run_thread_list, #run_thread_loop, #run_threads

Methods included from Ec2Wrapper

included

Constructor Details

#initializeMaster

Returns a new instance of Master.



13
14
15
16
17
18
# File 'lib/poolparty/master.rb', line 13

def initialize
  super            
  
  self.class.send :rules, :contract_when, Application.options.contract_when unless are_rules?(:contract_when)
  self.class.send :rules, :expand_when, Application.options.expand_when unless are_rules?(:expand_when)
end

Class Method Details

.build_haproxy_fileObject



461
462
463
464
465
466
# File 'lib/poolparty/master.rb', line 461

def build_haproxy_file
servers=<<-EOS
#{collect_nodes {|node| node.haproxy_entry}.join("\n")}
EOS
open(Application.haproxy_config_file).read.strip ^ {:servers => servers, :host_port => Application.host_port}
end

.build_user_global_filesObject

Placeholders



469
470
471
472
473
# File 'lib/poolparty/master.rb', line 469

def build_user_global_files
  global_user_files.each do |arr|
    write_to_file_for(arr[0]) &arr[1]
  end
end

.build_user_node_files_for(node) ⇒ Object



474
475
476
477
478
479
480
# File 'lib/poolparty/master.rb', line 474

def build_user_node_files_for(node)
  user_node_files.each do |arr|
    write_to_file_for(arr[0], node) do
      arr[1].call(node)
    end
  end
end

.cloud_ipsObject



427
428
429
# File 'lib/poolparty/master.rb', line 427

def cloud_ips
  new.cloud_ips
end

.collect_nodes(&block) ⇒ Object



414
415
416
# File 'lib/poolparty/master.rb', line 414

def collect_nodes(&block)
  new.nodes.collect &block
end

.define_global_user_file(name, &block) ⇒ Object



481
482
483
# File 'lib/poolparty/master.rb', line 481

def define_global_user_file(name, &block)
  global_user_files << [name, block]
end

.define_node_user_file(name, &block) ⇒ Object



485
486
487
# File 'lib/poolparty/master.rb', line 485

def define_node_user_file(name, &block)
  user_node_files << [name, block]
end

.get_masterObject



424
425
426
# File 'lib/poolparty/master.rb', line 424

def get_master
  new.nodes[0]
end

.get_next_node(node) ⇒ Object



430
431
432
# File 'lib/poolparty/master.rb', line 430

def get_next_node(node)
  new.get_next_node(node)
end

.global_user_filesObject



484
# File 'lib/poolparty/master.rb', line 484

def global_user_files;@global_user_files ||= [];end

.is_master_responding?Boolean

Returns:

  • (Boolean)


421
422
423
# File 'lib/poolparty/master.rb', line 421

def is_master_responding?
  `ping -c1 -t5 #{get_master.ip}`
end

.requires_heartbeat?Boolean

Returns:

  • (Boolean)


418
419
420
# File 'lib/poolparty/master.rb', line 418

def requires_heartbeat?
  new.nodes.size > 1
end

.set_hosts(c, remotetask = nil) ⇒ Object



433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/poolparty/master.rb', line 433

def set_hosts(c, remotetask=nil)
  unless remotetask.nil?
    rt = remotetask
  end
  
  ssh_location = `which ssh`.gsub(/\n/, '')
  rsync_location = `which rsync`.gsub(/\n/, '')
  rt.set :user, Application.username
  # rt.set :domain, "#{Application.user}@#{ip}"
  rt.set :application, Application.app_name
  rt.set :ssh_flags, "-i #{Application.keypair_path} -o StrictHostKeyChecking=no"
  rt.set :rsync_flags , ['-azP', '--delete', "-e '#{ssh_location} -l #{Application.username} -i #{Application.keypair_path} -o StrictHostKeyChecking=no'"]
  
  master = get_master
  rt.set :domain, "#{master.ip}" if master
  Master.with_nodes { |node|
    rt.host "#{Application.username}@#{node.ip}",:app if node.status =~ /running/
  }
end

.ssh_configure_string_for(node) ⇒ Object



453
454
455
456
457
458
459
460
# File 'lib/poolparty/master.rb', line 453

def ssh_configure_string_for(node)
  cmd=<<-EOC
    #{node.update_plugin_string(node)}
    pool maintain -c ~/.config -l #{PoolParty.plugin_dir}
    hostname -v #{node.name}
    /usr/bin/s3fs #{Application.shared_bucket} -o accessKeyId=#{Application.access_key} -o secretAccessKey=#{Application.secret_access_key} -o nonempty /data
  EOC
end

.user_node_filesObject



488
# File 'lib/poolparty/master.rb', line 488

def user_node_files;@user_node_files ||= [];end

.with_nodes(&block) ⇒ Object



410
411
412
# File 'lib/poolparty/master.rb', line 410

def with_nodes(&block)
  new.nodes.each &block
end

Instance Method Details

#add_instance_if_load_is_highObject Also known as: add_instance

Add an instance if the load is high



245
246
247
248
249
250
# File 'lib/poolparty/master.rb', line 245

def add_instance_if_load_is_high
  if expand?
    PoolParty.message "Cloud needs expansion"
    grow_by(1)
  end
end

#add_ssh_key(i) ⇒ Object



76
77
78
# File 'lib/poolparty/master.rb', line 76

def add_ssh_key(i)
  Kernel.system("ssh-add #{Application.keypair_path} >/dev/null 2>/dev/null")
end

#build_and_copy_heartbeat_authkeys_fileObject

Build the basic auth file for the heartbeat



294
295
296
297
298
# File 'lib/poolparty/master.rb', line 294

def build_and_copy_heartbeat_authkeys_file
  write_to_file_for("authkeys") do
    open(Application.heartbeat_authkeys_config_file).read
  end
end

#build_and_send_config_files_in_temp_directoryObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/poolparty/master.rb', line 178

def build_and_send_config_files_in_temp_directory
  require 'ftools'
  if File.directory?(Application.plugin_dir)
    Kernel.system("tar -czf #{base_tmp_dir}/plugins.tar.gz #{File.basename(Application.plugin_dir)}")
  end
  
  if Master.requires_heartbeat?
    build_and_copy_heartbeat_authkeys_file
    File.copy(get_config_file_for("cloud_master_takeover"), "#{base_tmp_dir}/cloud_master_takeover")
    File.copy(get_config_file_for("heartbeat.conf"), "#{base_tmp_dir}/ha.cf")
  end
  
  File.copy(Application.config_file, "#{base_tmp_dir}/config.yml") if Application.config_file && File.exists?(Application.config_file)
  File.copy(Application.keypair_path, "#{base_tmp_dir}/keypair") if File.exists?(Application.keypair_path)      
  
  copy_pem_files_to_tmp_dir
        
  copy_config_files_in_directory_to_tmp_dir("config/resource.d")
  # copy_config_files_in_directory_to_tmp_dir("config/monit.d")
        
  build_haproxy_file
  Master.build_user_global_files
  
  build_nodes_list
    
  Master.with_nodes do |node|
    build_hosts_file_for(node)
    build_reconfigure_instances_script_for(node)
    Master.build_user_node_files_for(node)
    
    if Master.requires_heartbeat?
      build_heartbeat_config_file_for(node)
      build_heartbeat_resources_file_for(node)
    end
  end
end

#build_haproxy_fileObject

Build the basic haproxy config file from the config file in the config directory and return a tempfile



274
275
276
277
278
279
280
281
# File 'lib/poolparty/master.rb', line 274

def build_haproxy_file
  write_to_file_for("haproxy") do
    servers=<<-EOS
#{nodes.collect {|node| node.haproxy_entry}.join("\n")}
    EOS
    open(Application.haproxy_config_file).read.strip ^ {:servers => servers, :host_port => Application.host_port}
  end
end

#build_heartbeat_config_file_for(node) ⇒ Object

Build heartbeat config file



300
301
302
303
304
305
# File 'lib/poolparty/master.rb', line 300

def build_heartbeat_config_file_for(node)
  write_to_file_for("heartbeat", node) do
    servers = "#{node.node_entry}\n#{get_next_node(node).node_entry}" rescue ""
    open(Application.heartbeat_config_file).read.strip ^ {:nodes => servers}
  end
end

#build_heartbeat_resources_file_for(node) ⇒ Object



306
307
308
309
310
# File 'lib/poolparty/master.rb', line 306

def build_heartbeat_resources_file_for(node)
  write_to_file_for("haresources", node) do
    "#{node.haproxy_resources_entry}\n#{get_next_node(node).haproxy_resources_entry}" rescue ""
  end        
end

#build_hosts_file_for(n) ⇒ Object

Build host file for a specific node



283
284
285
286
287
# File 'lib/poolparty/master.rb', line 283

def build_hosts_file_for(n)
  write_to_file_for("hosts", n) do
    "#{nodes.collect {|node| node.ip == n.ip ? node.local_hosts_entry : node.hosts_entry}.join("\n")}"
  end
end

#build_nodes_listObject



288
289
290
291
292
# File 'lib/poolparty/master.rb', line 288

def build_nodes_list
  write_to_file_for(RemoteInstance.node_list_name) do
    "#{cloud_ips.join("\n")}"
  end
end

#build_reconfigure_instances_script_for(node) ⇒ Object

Build basic configuration script for the node



312
313
314
315
316
# File 'lib/poolparty/master.rb', line 312

def build_reconfigure_instances_script_for(node)
  write_to_file_for("configuration", node) do
    open(Application.sh_reconfigure_instances_script).read.strip ^ node.configure_tasks( !PoolParty.verbose? )
  end        
end

#check_statsObject

Sole purpose to check the stats, mainly in a plugin



134
135
136
137
# File 'lib/poolparty/master.rb', line 134

def check_stats
  str = registered_monitors.collect {|m| "#{m}"}
  PoolParty.message "Monitors: #{str.join(", ")}"
end

#cleanup_tmp_directory(c) ⇒ Object



223
224
225
# File 'lib/poolparty/master.rb', line 223

def cleanup_tmp_directory(c)
  Dir["#{base_tmp_dir}/*"].each {|f| FileUtils.rm_rf f} if File.directory?("tmp/")
end

#cloud_ipsObject



103
104
105
# File 'lib/poolparty/master.rb', line 103

def cloud_ips
  @ips ||= nodes.collect {|a| a.ip }
end

#cloud_nodesObject

Return a list of the nodes for each keypair and cache them



348
349
350
351
352
353
354
355
356
357
358
# File 'lib/poolparty/master.rb', line 348

def cloud_nodes
  @cloud_nodes ||= begin
    nodes_list = []
    cloud_keypairs.each {|keypair| 
      list_of_nonterminated_instances(list_of_instances(keypair)).collect_with_index { |inst, i|
        nodes_list << RemoteInstance.new(inst.merge({:number => i}))
      }
    }
    nodes_list
  end
end

#clouds_listObject



382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/poolparty/master.rb', line 382

def clouds_list
  if number_of_all_pending_and_running_instances > 0
    out = "-- ALL CLOUDS (#{number_of_all_pending_and_running_instances})--\n"
    keypair = nil
    out << cloud_nodes.collect {|node|
      str = ""
      if keypair != node.keypair
        keypair = node.keypair;
        str = "key pair: #{keypair} (#{number_of_pending_and_running_instances(keypair)})\n"
      end
      str += "\t"+node.description if !node.description.nil?
    }.join("\n")
  else
    out = "Clouds are not running"
  end
  out
end

#configure_cloudObject

Configure the master because the master will take care of the rest after that



65
66
67
68
69
70
71
72
73
# File 'lib/poolparty/master.rb', line 65

def configure_cloud
  message "Configuring master"
  build_and_send_config_files_in_temp_directory
  remote_configure_instances
  
  nodes.each do |node|
    node.configure
  end      
end

#contract?Boolean

FOR MONITORING

Returns:

  • (Boolean)


261
262
263
# File 'lib/poolparty/master.rb', line 261

def contract?
  valid_rules?(:contract_when)
end

#copy_config_files_in_directory_to_tmp_dir(dir) ⇒ Object

Copy all the files in the directory to the dest



327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/poolparty/master.rb', line 327

def copy_config_files_in_directory_to_tmp_dir(dir)
  dest_dir = "#{base_tmp_dir}/#{File.basename(dir)}"
  FileUtils.mkdir_p dest_dir
  
  if File.directory?("#{user_dir}/#{dir}")        
    Dir["#{user_dir}/#{dir}/*"].each do |file|
      File.copy(file, dest_dir)
    end
  else
    Dir["#{root_dir}/#{dir}/*"].each do |file|
      File.copy(file, dest_dir)
    end
  end      
end

#copy_pem_files_to_tmp_dirObject



214
215
216
217
218
219
220
221
222
# File 'lib/poolparty/master.rb', line 214

def copy_pem_files_to_tmp_dir
  %w(EC2_CERT EC2_PRIVATE_KEY).each do |key|
    begin
      file = `echo $#{key}`.strip
      File.copy(file, "#{base_tmp_dir}/#{File.basename(file)}")
    rescue Exception => e
    end
  end
end

#expand?Boolean

Returns:

  • (Boolean)


264
265
266
# File 'lib/poolparty/master.rb', line 264

def expand?
  valid_rules?(:expand_when)
end

#get_config_file_for(name) ⇒ Object

Try the user’s directory before the master directory



319
320
321
322
323
324
325
# File 'lib/poolparty/master.rb', line 319

def get_config_file_for(name)
  if File.exists?("#{user_dir}/config/#{name}")
    "#{user_dir}/config/#{name}"
  else
    "#{root_dir}/config/#{name}"
  end
end

#get_next_node(node) ⇒ Object

Get the next node in sequence, so we can configure heartbeat to monitor the next node



364
365
366
367
368
# File 'lib/poolparty/master.rb', line 364

def get_next_node(node)
  i = node.number + 1
  i = 0 if i >= nodes.size
  get_node(i)
end

#get_node(i = 0) ⇒ Object

Get the node at the specific index from the cached nodes



360
361
362
# File 'lib/poolparty/master.rb', line 360

def get_node(i=0)
  nodes.select {|a| a.number == i.to_i}.first
end

#grow_by(num = 1) ⇒ Object



156
157
158
159
160
161
162
163
# File 'lib/poolparty/master.rb', line 156

def grow_by(num=1)
  request_launch_new_instances(num)
  
  wait_for_all_instances_to_boot
  
  reset!
  configure_cloud
end

#install_cloud(bool = false) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/poolparty/master.rb', line 82

def install_cloud(bool=false)
  if Application.install_on_load? || bool
    # Just in case, add the new ubuntu apt-sources as well as updating and fixing the 
    # update packages.
    update_apt_string =<<-EOE        
      touch /etc/apt/sources.list
      echo 'deb http://mirrors.kernel.org/ubuntu hardy main universe' >> /etc/apt/sources.list
      apt-get update --fix-missing
    EOE
    
    ssh(update_apt_string)
    
    Provider.install_poolparty
    
    # For plugins
    nodes.each do |node|
      node.install
    end
    
  end
end

#launch_minimum_instancesObject

Launch the minimum number of instances.



107
108
109
110
# File 'lib/poolparty/master.rb', line 107

def launch_minimum_instances
  request_launch_new_instances(Application.minimum_instances - number_of_pending_and_running_instances)
  nodes
end

#listObject

List the clouds



373
374
375
376
377
378
379
380
381
# File 'lib/poolparty/master.rb', line 373

def list
  if number_of_pending_and_running_instances > 0
    out = "-- CLOUD (#{number_of_pending_and_running_instances})--\n"
    out << nodes.collect {|node| node.description }.join("\n")
  else
    out = "Cloud is not running"
  end
  out
end

#make_base_tmp_dir(c) ⇒ Object



174
175
176
# File 'lib/poolparty/master.rb', line 174

def make_base_tmp_dir(c)
  `mkdir #{base_tmp_dir}` unless File.directory?(base_tmp_dir)
end

#nodesObject

Return a list of the nodes and cache them



342
343
344
345
346
# File 'lib/poolparty/master.rb', line 342

def nodes
  @nodes ||= list_of_nonterminated_instances.collect_with_index do |inst, i|
    RemoteInstance.new(inst.merge({:number => i}))
  end
end

#number_of_unconfigured_nodesObject



152
153
154
155
# File 'lib/poolparty/master.rb', line 152

def number_of_unconfigured_nodes
  # TODO: Find a better way to tell if the nodes are configured.
  nodes.reject {|a| a.stack_installed? }.size
end

#on_exitObject

On exit command



370
371
# File 'lib/poolparty/master.rb', line 370

def on_exit      
end

#reconfigure_cloud_when_necessaryObject

Tough method: We need to make sure that all the instances have the required software installed This is a basic check against the local store of the instances that have the stack installed.



148
149
150
151
# File 'lib/poolparty/master.rb', line 148

def reconfigure_cloud_when_necessary
  PoolParty.message "#{number_of_unconfigured_nodes} unconfigured nodes"
  configure_cloud if number_of_unconfigured_nodes > 0
end

#remote_configure_instancesObject



232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/poolparty/master.rb', line 232

def remote_configure_instances
  arr = []
  Master.with_nodes do |node|
    script_file = "#{remote_base_tmp_dir}/#{node.name}-configuration"
    str=<<-EOC
chmod +x #{script_file}
/bin/sh #{script_file}
    EOC
    arr << "#{self.class.ssh_string} #{node.ip} '#{str.strip.runnable}'"
  end
  run_array_of_tasks(arr)
end

#remove_ssh_key(i) ⇒ Object



79
80
81
# File 'lib/poolparty/master.rb', line 79

def remove_ssh_key(i)
  Kernel.system("ssh-add -d #{Application.keypair_name} >/dev/null 2>/dev/null")
end

#reset!Object

Reset and clear the caches



400
401
402
403
404
# File 'lib/poolparty/master.rb', line 400

def reset!
  @cached_descriptions = nil
  @nodes = nil
  @cloud_nodes = nil
end

#restart_running_instances_servicesObject

Restart the running instances services with monit on all the nodes



268
269
270
271
272
# File 'lib/poolparty/master.rb', line 268

def restart_running_instances_services
  nodes.each do |node|
    node.restart_with_monit
  end
end

#scale_cloud!Object Also known as: scale_cloud

Add an instance if the cloud needs one ore terminate one if necessary



139
140
141
142
# File 'lib/poolparty/master.rb', line 139

def scale_cloud!
  add_instance_if_load_is_high
  terminate_instance_if_load_is_low
end

#send_config_files_to_nodes(c) ⇒ Object

Send the files to the nodes



228
229
230
# File 'lib/poolparty/master.rb', line 228

def send_config_files_to_nodes(c)
  run_array_of_tasks(rsync_tasks("#{base_tmp_dir}/*", "#{remote_base_tmp_dir}"))
end

#setup_cloudObject



34
35
36
37
# File 'lib/poolparty/master.rb', line 34

def setup_cloud
  install_cloud
  configure_cloud
end

#shrink_by(num = 1) ⇒ Object



164
165
166
167
168
169
170
171
172
173
# File 'lib/poolparty/master.rb', line 164

def shrink_by(num=1)
  num.times do |i|
    # Get the last node that is not the master
    node = nodes.reject {|a| a.master? }[-1]
    res = request_termination_of_instance(node.instance_id) if node
    PoolParty.message "#{res ? "Could" : "Could not"} shutdown instance"
  end
  wait_for_all_instances_to_terminate
  configure_cloud
end

#start!Object Also known as: start

Start the cloud, which launches the minimum_instances



25
26
27
28
29
30
31
32
33
# File 'lib/poolparty/master.rb', line 25

def start!
  message "Launching minimum_instances"
  launch_minimum_instances
  message "Waiting for master to boot up" 
  
  wait_for_all_instances_to_boot
  
  setup_cloud
end

#start_cloud!Object Also known as: start_cloud

Start the cloud



20
21
22
# File 'lib/poolparty/master.rb', line 20

def start_cloud!
  start!
end

#start_monitor!Object Also known as: start_monitor

Start monitoring the cloud with the threaded loop



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/poolparty/master.rb', line 112

def start_monitor!
  begin
    trap("INT") do
      on_exit
      exit
    end
    # Daemonize only if we are not in the test environment
    run_thread_loop(:daemonize => !Application.test?) do
      add_task {PoolParty.message "Checking cloud"}
      add_task {launch_minimum_instances}
      add_task {reconfigure_cloud_when_necessary}
      add_task {scale_cloud!}
      add_task {check_stats}
    end
  rescue Exception => e
    Process.kill("HUP", Process.pid)
  end
end

#terminate_instance_if_load_is_lowObject Also known as: terminate_instance

Teardown an instance if the load is pretty low



253
254
255
256
257
258
# File 'lib/poolparty/master.rb', line 253

def terminate_instance_if_load_is_low      
  if contract?
    PoolParty.message "Cloud to shrink"
    shrink_by(1)
  end
end

#user_tasksObject



131
132
# File 'lib/poolparty/master.rb', line 131

def user_tasks
end

#wait_for_all_instances_to_bootObject



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/poolparty/master.rb', line 39

def wait_for_all_instances_to_boot
  reset!
  while !number_of_pending_instances.zero?
    wait "2.seconds" unless Application.test?
    waited = true
    reset!
  end
  unless Application.test? || waited.nil?
    message "Give some time for the instance ssh to start up"
    wait "15.seconds"
  end
end

#wait_for_all_instances_to_terminateObject



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/poolparty/master.rb', line 51

def wait_for_all_instances_to_terminate
  reset!
  while !list_of_terminating_instances.size.zero?
    wait "2.seconds" unless Application.test?
    waited = true
    reset!
  end
  unless Application.test? || waited.nil?
    message "Give some time for the instance ssh to start up"
    wait "15.seconds"
  end
  reset!
end