Class: Jetpants::Host

Inherits:
Object show all
Includes:
CallbackHandler
Defined in:
lib/jetpants/host.rb

Overview

Encapsulates a UNIX server that we can SSH to as root. Maintains a pool of SSH connections to the host as needed.

Constant Summary collapse

@@all_hosts =
{}
@@all_hosts_mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from CallbackHandler

included

Constructor Details

#initialize(ip) ⇒ Host

Returns a new instance of Host.



31
32
33
34
35
36
37
38
# File 'lib/jetpants/host.rb', line 31

def initialize(ip)
  # Only supporting ipv4 for now
  raise "Invalid IP address: #{ip}" unless ip =~ /\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/
  @ip = ip
  @connection_pool = [] # array of idle Net::SSH::Connection::Session objects
  @lock = Mutex.new
  @available = nil
end

Instance Attribute Details

#ipObject (readonly)

IP address of the Host, as a string.



12
13
14
# File 'lib/jetpants/host.rb', line 12

def ip
  @ip
end

Class Method Details

.clearObject



17
18
19
# File 'lib/jetpants/host.rb', line 17

def self.clear
  @@all_hosts_mutex.synchronize {@@all_hosts = {}}
end

.local(interface = false) ⇒ Object

Returns a Host object for the machine Jetpants is running on.



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/jetpants/host.rb', line 41

def self.local(interface=false)
  interface ||= Jetpants.private_interface
  # This technique is adapted from Sergio Rubio Gracia's, described at
  # http://blog.frameos.org/2006/12/09/getting-network-interface-addresses-using-ioctl-pure-ruby-2/
  sock = Socket.new(Socket::AF_INET, Socket::SOCK_DGRAM,0)
  buf = [interface, ""].pack('a16h16')
  sock.ioctl(0x8915, buf) # SIOCGIFADDR
  sock.close
  ip_string = buf[20...24].unpack('C*').join '.'
  self.new(ip_string)
end

.new(ip) ⇒ Object

We override Host.new so that attempting to create a duplicate Host object (that is, one with the same IP as an existing Host object) returns the original object.



24
25
26
27
28
29
# File 'lib/jetpants/host.rb', line 24

def self.new(ip)
  @@all_hosts_mutex.synchronize do
    @@all_hosts[ip] = nil unless @@all_hosts[ip].is_a? self
    @@all_hosts[ip] ||= super
  end
end

Instance Method Details

#available?Boolean

Returns true if the host is accessible via SSH, false otherwise

Returns:

  • (Boolean)


151
152
153
154
155
156
157
158
# File 'lib/jetpants/host.rb', line 151

def available?
  # If we haven't tried an ssh command yet, @available will be nil. Running
  # a first no-op command will populate it to true or false.
  if @available.nil?
    ssh_cmd 'echo ping' rescue nil
  end
  @available
end

#compare_dir(base_dir, targets, options = {}) ⇒ Object

Compares file existence and size between hosts. Param format identical to the first three params of Host#fast_copy_chain, except only supported option is :files. Raises an exception if the files don’t exactly match, otherwise returns true.



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/jetpants/host.rb', line 276

def compare_dir(base_dir, targets, options={})
  # Normalize the filesnames param so it is an array
  filenames = options[:files] || ['.']
  filenames = [filenames] unless filenames.respond_to?(:each)
  
  # Normalize the targets param, so that targets is an array of Hosts and
  # destinations is a hash of hosts => dirs
  destinations = {}
  targets = [targets] unless targets.respond_to?(:each)
  base_dir += '/' unless base_dir[-1] == '/'
  if targets.is_a? Hash
    destinations = targets
    destinations.each {|t, d| destinations[t] += '/' unless d[-1] == '/'}
    targets = targets.keys
  else
    destinations = targets.inject({}) {|memo, target| memo[target] = base_dir; memo}
  end
  raise "No target hosts supplied" if targets.count < 1
  
  queue = filenames.map {|f| ['', f]}  # array of [subdir, filename] pairs
  while (tuple = queue.shift)
    subdir, filename = tuple
    source_dirlist = dir_list(base_dir + subdir + filename)
    destinations.each do |target, path|
      target_dirlist = target.dir_list(path + subdir + filename)
      source_dirlist.each do |name, size|
        target_size = target_dirlist[name] || 'MISSING'
        raise "Directory listing mismatch when comparing #{self}:#{base_dir}#{subdir}#{filename}/#{name} to #{target}:#{path}#{subdir}#{filename}/#{name}  (size: #{size} vs #{target_size})" unless size == target_size
      end
    end
    queue.concat(source_dirlist.map {|name, size| size == '/' ? [subdir + '/' + name, '/'] : nil}.compact)
  end
end

#confirm_installed(program_name) ⇒ Object

Confirms that the specified binary is installed and on the shell path.



358
359
360
361
362
# File 'lib/jetpants/host.rb', line 358

def confirm_installed(program_name)
  out = ssh_cmd "which #{program_name}"
  raise "#{program_name} not installed, or missing from path" if out =~ /no #{program_name} in /
  true
end

#confirm_listening_on_port(port, timeout = 10) ⇒ Object

Confirm that something is listening on the given port. The timeout param indicates how long to wait (in seconds) for a process to be listening.



144
145
146
147
148
# File 'lib/jetpants/host.rb', line 144

def confirm_listening_on_port(port, timeout=10)
  checker_th = Thread.new { ssh_cmd "while [[ `netstat -ln | grep :#{port} | wc -l` -lt 1 ]] ; do sleep 1; done" }
  raise "Nothing is listening on #{@ip}:#{port} after #{timeout} seconds" unless checker_th.join(timeout)
  true
end

#coresObject

Returns number of cores on machine. (reflects virtual cores if hyperthreading enabled, so might be 2x real value in that case.) Not currently used by anything in Jetpants base, but might be useful for plugins that want to tailor the concurrency level to the machine’s capabilities.



381
382
383
384
385
# File 'lib/jetpants/host.rb', line 381

def cores
  return @cores if @cores
  count = ssh_cmd %q{cat /proc/cpuinfo|grep 'processor\s*:' | wc -l}
  @cores = (count ? count.to_i : 1)
end

#dir_list(dir) ⇒ Object

Given the name of a directory or single file, returns a hash of filename => size of each file present. Subdirectories will be returned with a size of ‘/’, so you can process these differently as needed. WARNING: This is brittle. It parses output of “ls”. If anyone has a gem to do better remote file management via ssh, then please by all means send us a pull request!



260
261
262
263
264
265
266
267
268
269
270
# File 'lib/jetpants/host.rb', line 260

def dir_list(dir)
  ls_out = ssh_cmd "ls --color=never -1AgGF #{dir}"  # disable color, 1 file per line, all but . and .., hide owner+group, include type suffix
  result = {}
  ls_out.split("\n").each do |line|
    next unless matches = line.match(/^[\w-]+\s+\d+\s+(?<size>\d+).*(?:\d\d:\d\d|\d{4})\s+(?<name>.*)$/)
    file_name = matches[:name]
    file_name = file_name[0...-1] if file_name =~ %r![*/=>@|]$!
    result[file_name.split('/')[-1]] = (matches[:name][-1] == '/' ? '/' : matches[:size].to_i)
  end
  result
end

#dir_size(dir) ⇒ Object

Recursively computes size of files in dir



311
312
313
314
315
316
317
# File 'lib/jetpants/host.rb', line 311

def dir_size(dir)
  total_size = 0
  dir_list(dir).each do |name, size|
    total_size += (size == '/' ? dir_size(dir + '/' + name) : size.to_i)
  end
  total_size
end

#fast_copy_chain(base_dir, targets, options = {}) ⇒ Object

Quickly and efficiently recursively copies a directory to one or more target hosts.

base_dir

is base directory to copy from the source (self). Also the default destination base directory on the targets, if not supplied via next param.

targets

is one of the following:

  • Host object, or any object that delegates method_missing to a Host (such as DB)

  • array of Host objects (or delegates)

  • hash mapping Host objects (or delegates) to destination base directory overrides (as string)

options

is a hash that can contain –

  • :files => only copy these filenames instead of entire base_dir. String, or Array of Strings.

  • :port => port number to use for netcat. defaults to 7000 if omitted.

  • :overwrite => if true, don’t raise an exception if the base_dir is non-empty or :files exist. default false.



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/jetpants/host.rb', line 174

def fast_copy_chain(base_dir, targets, options={})
  # Normalize the filesnames param so it is an array
  filenames = options[:files] || ['.']
  filenames = [filenames] unless filenames.respond_to?(:each)
  
  # Normalize the targets param, so that targets is an array of Hosts and
  # destinations is a hash of hosts => dirs
  destinations = {}
  targets = [targets] unless targets.respond_to?(:each)
  base_dir += '/' unless base_dir[-1] == '/'
  if targets.is_a? Hash
    destinations = targets
    destinations.each {|t, d| destinations[t] += '/' unless d[-1] == '/'}
    targets = targets.keys
  else
    destinations = targets.inject({}) {|memo, target| memo[target] = base_dir; memo}
  end
  raise "No target hosts supplied" if targets.count < 1
  
  file_list = filenames.join ' '
  port = (options[:port] || 7000).to_i
  
  if Jetpants.compress_with || Jetpants.decompress_with
    comp_bin = Jetpants.compress_with.split(' ')[0]
    confirm_installed comp_bin
    output "Using #{comp_bin} for compression"
  else
    output "Compression disabled -- no compression method specified in Jetpants config file"
  end
  
  # On each destination host, do any initial setup (and optional validation/erasing),
  # and then listen for new files.  If there are multiple destination hosts, all of them
  # except the last will use tee to "chain" the copy along to the next machine.
  workers = []
  targets.reverse.each_with_index do |t, i|
    dir = destinations[t]
    raise "Directory #{t}:#{dir} looks suspicious" if dir.include?('..') || dir.include?('./') || dir == '/' || dir == ''
    
    if Jetpants.compress_with || Jetpants.decompress_with
      decomp_bin = Jetpants.decompress_with.split(' ')[0]
      t.confirm_installed decomp_bin
    end
    t.ssh_cmd "mkdir -p #{dir}"
    
    # Check if contents already exist / non-empty.
    # Note: doesn't do recursive scan of subdirectories
    unless options[:overwrite]
      all_paths = filenames.map {|f| dir + f}.join ' '
      dirlist = t.dir_list(all_paths)
      dirlist.each {|name, size| raise "File #{name} exists on destination and has nonzero size!" if size.to_i > 0}
    end
    
    decompression_pipe = Jetpants.decompress_with ? "| #{Jetpants.decompress_with}" : ''
    if i == 0
      workers << Thread.new { t.ssh_cmd "cd #{dir} && nc -l #{port} #{decompression_pipe} | tar xv" }
      t.confirm_listening_on_port port
      t.output "Listening with netcat."
    else
      tt = targets.reverse[i-1]
      fifo = "fifo#{port}"
      workers << Thread.new { t.ssh_cmd "cd #{dir} && mkfifo #{fifo} && nc #{tt.ip} #{port} <#{fifo} && rm #{fifo}" }
      checker_th = Thread.new { t.ssh_cmd "while [ ! -p #{dir}/#{fifo} ] ; do sleep 1; done" }
      raise "FIFO not found on #{t} after 10 tries" unless checker_th.join(10)
      workers << Thread.new { t.ssh_cmd "cd #{dir} && nc -l #{port} | tee #{fifo} #{decompression_pipe} | tar xv" }
      t.confirm_listening_on_port port
      t.output "Listening with netcat, and chaining to #{tt}."
    end
  end
  
  # Start the copy chain.
  output "Sending files over to #{targets[0]}: #{file_list}"
  compression_pipe = Jetpants.compress_with ? "| #{Jetpants.compress_with}" : ''
  ssh_cmd "cd #{base_dir} && tar vc #{file_list} #{compression_pipe} | nc #{targets[0].ip} #{port}"
  workers.each {|th| th.join}
  output "File copy complete."
  
  # Verify
  output "Verifying file sizes and types on all destinations."
  compare_dir base_dir, destinations, options
  output "Verification successful."
end

#get_ssh_connectionObject

Returns a Net::SSH::Connection::Session for the host. Verifies that the connection is working before returning it.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/jetpants/host.rb', line 55

def get_ssh_connection
  conn = nil
  attempts = 0
  5.times do |attempt|
    @lock.synchronize do
      if @connection_pool.count > 0
        conn = @connection_pool.shift
      end
    end
    unless conn
      params = {
        :paranoid => false,
        :user_known_hosts_file => '/dev/null',
        :timeout => 5,
      }
      params[:keys] = Jetpants.ssh_keys if Jetpants.ssh_keys
      begin
        @lock.synchronize do 
          conn = Net::SSH.start(@ip, 'root', params)
        end
      rescue => ex
        output "Unable to SSH on attempt #{attempt + 1}: #{ex.to_s}"
        conn = nil
        next
      end
    end
    
    # Confirm that the connection works
    if conn
      begin
        result = conn.exec!('echo ping').strip
        raise "Unexpected result" unless result == 'ping'
        @available = true
        return conn
      rescue
        output "Discarding nonfunctional SSH connection"
        conn = nil
      end
    end
  end
  @available = false
  raise "Unable to obtain working SSH connection to #{self} after 5 attempts"
end

#hostnameObject

Returns the machine’s hostname



399
400
401
402
# File 'lib/jetpants/host.rb', line 399

def hostname
  return 'unknown' unless available?
  @hostname ||= ssh_cmd('hostname').chomp
end

#memory(in_gb = false) ⇒ Object

Returns the amount of memory on machine, either in bytes (default) or in GB. Linux-specific.



389
390
391
392
393
394
395
396
# File 'lib/jetpants/host.rb', line 389

def memory(in_gb=false)
  line = ssh_cmd 'cat /proc/meminfo | grep MemTotal'
  matches = line.match /(?<size>\d+)\s+(?<unit>kB|mB|gB|B)/
  size = matches[:size].to_i
  multipliers = {kB: 1024, mB: 1024**2, gB: 1024**3, B: 1}
  size *= multipliers[matches[:unit].to_sym]
  in_gb ? size / 1024**3 : size
end

#mount_stats(mount) ⇒ Object



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/jetpants/host.rb', line 319

def mount_stats(mount)
  mount_stats = {}

  output = ssh_cmd "df -k " + mount + "|tail -1| awk '{print $2\",\"$3\",\"$4}'" 
  if output
    output = output.split(',').map{|s| s.to_i}

    mount_stats['total'] = output[0] * 1024
    mount_stats['used'] = output[1] * 1024
    mount_stats['available'] = output[2] * 1024
    return mount_stats
  else
    false
  end
end

#output(str) ⇒ Object

Displays the provided output, along with information about the current time, and self (the IP of this Host)



406
407
408
409
410
411
412
413
414
# File 'lib/jetpants/host.rb', line 406

def output(str)
  str = str.to_s.strip
  str = nil if str && str.length == 0
  str ||= "Completed (no output)"
  output = Time.now.strftime("%H:%M:%S") + " [#{self}] "
  output << str
  print output + "\n"
  output
end

#pid_running?(pid, matching_string = false) ⇒ Boolean

Checks if there’s a process with the given process ID running on this host. Optionally also checks if matching_string is contained in the process name. Returns true if so, false if not. Warning: this implementation assumes Linux-style “ps” command; will not work on BSD hosts.

Returns:

  • (Boolean)


369
370
371
372
373
374
375
# File 'lib/jetpants/host.rb', line 369

def pid_running?(pid, matching_string=false)
  if matching_string
    ssh_cmd("ps --no-headers -o command #{pid} | grep '#{matching_string}' | wc -l").chomp.to_i > 0
  else
    ssh_cmd("ps --no-headers #{pid} | wc -l").chomp.to_i > 0
  end
end

#save_ssh_connection(conn) ⇒ Object

Adds a Net::SSH::Connection::Session to a pool of idle persistent connections.



100
101
102
103
104
105
106
107
# File 'lib/jetpants/host.rb', line 100

def save_ssh_connection(conn)
  conn.exec! 'cd ~'
  @lock.synchronize do
    @connection_pool << conn
  end
rescue
  output "Discarding nonfunctional SSH connection"
end

#service(operation, name, options = '') ⇒ Object

Performs the given operation (:start, :stop, :restart, :status) for the specified service (ie “mysql”). Requires that the “service” bin is in root’s PATH. Please be aware that the output format and exit codes for the service binary vary between Linux distros! You may find that you need to override methods that call Host#service with :status operation (such as DB#probe_running) in a custom plugin, to parse the output properly on your chosen Linux distro.



346
347
348
# File 'lib/jetpants/host.rb', line 346

def service(operation, name, options='')
  ssh_cmd "service #{name} #{operation.to_s} #{options}".rstrip
end

#set_io_scheduler(name, device = 'sda') ⇒ Object

Changes the I/O scheduler to name (such as ‘deadline’, ‘noop’, ‘cfq’) for the specified device.



352
353
354
355
# File 'lib/jetpants/host.rb', line 352

def set_io_scheduler(name, device='sda')
  output "Setting I/O scheduler for #{device} to #{name}."
  ssh_cmd "echo '#{name}' >/sys/block/#{device}/queue/scheduler"
end

#ssh_cmd(cmd, attempts = 3) ⇒ Object

Execute the given UNIX command string (or array of strings) as root via SSH. By default, if something is wrong with the SSH connection, the command will be attempted up to 3 times before an exception is thrown. Be sure to set this to 1 or false for commands that are not idempotent. Returns the result of the command executed. If cmd was an array of strings, returns the result of the LAST command executed.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/jetpants/host.rb', line 115

def ssh_cmd(cmd, attempts=3)
  attempts ||= 1
  conn = get_ssh_connection
  cmd = [cmd] unless cmd.is_a? Array
  result = nil
  cmd.each do |c|
    failures = 0
    begin
      result = conn.exec! c
    rescue
      failures += 1
      raise if failures >= attempts
      output "Command \"#{c}\" failed, re-trying after delay"
      sleep(failures)
      retry
    end
  end
  save_ssh_connection conn
  return result
end

#ssh_cmd!(cmd) ⇒ Object

Shortcut for use when a command is not idempotent and therefore isn’t safe to retry if something goes wonky with the SSH connection.



138
139
140
# File 'lib/jetpants/host.rb', line 138

def ssh_cmd!(cmd)
  ssh_cmd cmd, false
end

#to_hostObject

Returns self, since this object is already a Host.



422
423
424
# File 'lib/jetpants/host.rb', line 422

def to_host
  self
end

#to_sObject

Returns the host’s IP address as a string.



417
418
419
# File 'lib/jetpants/host.rb', line 417

def to_s
  return @ip
end