Class: ClusterBomb::Cluster

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/cluster_bomb/cluster.rb

Defined Under Namespace

Classes: Host

Constant Summary

Constants included from Logging

Logging::DEFAULT_LOGFILENAME

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

log, log_disable, log_enable, log_init, puts, #puts

Constructor Details

#initialize(user, options = {}) ⇒ Cluster

Returns a new instance of Cluster.



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/cluster_bomb/cluster.rb', line 61

def initialize(user, options={})
  @user_name ||= user
  @connections=[]
  @hosts=[]
  @ssh_options=options
  @connection_mutex = Mutex.new
  @connected=false
  @connection_cache={}
  @nicknames={}
  @start_time =nil
  @max_time=nil
end

Instance Attribute Details

#hostsObject

Returns the value of attribute hosts.



60
61
62
# File 'lib/cluster_bomb/cluster.rb', line 60

def hosts
  @hosts
end

#nicknamesObject

Returns the value of attribute nicknames.



60
61
62
# File 'lib/cluster_bomb/cluster.rb', line 60

def nicknames
  @nicknames
end

Instance Method Details

#clear!Object



294
295
296
# File 'lib/cluster_bomb/cluster.rb', line 294

def clear!
  @hosts.each {|h| h.clear!}
end

#connect!(host_list) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/cluster_bomb/cluster.rb', line 74

def connect!(host_list)
  return if host_list.empty?
  @hosts=[]
  # Build up results rray
  host_list.each {|hostname| @hosts << Host.new(hostname, self)}

  # Connect. Build up connections array
  # Seems like there would be an async call to do this -- but it looks like
  # Not -- so we resort to threads
  puts "Connecting to #{hosts.length} hosts"
  ensure_connected!
  @connected=true    
  puts "Connected to #{hosts.length} hosts"
end

#connected?Boolean

Returns:

  • (Boolean)


268
269
270
# File 'lib/cluster_bomb/cluster.rb', line 268

def connected?
  @connected
end

#credentials(user, ssh_opts) ⇒ Object

Credentials to be used for next connection attempt.



90
91
92
93
# File 'lib/cluster_bomb/cluster.rb', line 90

def credentials(user, ssh_opts)
   @ssh_options=ssh_opts
   @user_name = user
end

#disconnect!Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/cluster_bomb/cluster.rb', line 272

def disconnect!
  if @connections
    @connection_cache.each do |k, conn|
      begin
        conn.close
      rescue Exception=>e
        puts "Non-fatal EXCEPTION closing connection: #{e.message}"
      end
    end
  end
  @hosts.each {|h| h.connected=false}
  @connections=[]
  @connection_cache={}
  @connected=false
  @hosts=[]
end

#download(remote, local, options = {}, &task) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
# File 'lib/cluster_bomb/cluster.rb', line 162

def download(remote, local, options={}, &task)
  ensure_connected!
  set_run_timer(options)
  @connections.each do |c|
    next if c[:connection].nil?
    c[:completed]=false      
    c[:connection].scp.download(remote,local)
  end
  event_loop(task)
  @hosts      
end

#ensure_connected!Object

Be sure all hosts are connected that have not previously failed to connect



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/cluster_bomb/cluster.rb', line 96

def ensure_connected!      
  if @ssh_options[:timeout]
    total_timeout = @ssh_options[:timeout] * 2
  else
    total_timeout = 30
  end
  # puts "Total timeout: #{total_timeout}"
  @connections=[]
  hosts_to_connect = @hosts.inject(0) {|sum,h| sum += (h.connect_failed ? 0:1)}
  # puts "#{hosts_to_connect} to connect"
  @hosts.each do |host|
    if @connection_cache[host.name] || host.connected
      @connection_mutex.synchronize { @connections << {:connection=>@connection_cache[host.name], :host=>host} }
      host.connected=true
    elsif !host.connect_failed
      Thread.new {
        begin
          #puts "Connecting #{host.name}"
          name, port = host.name.split(':')
          port ||= "22"
          c = Net::SSH.start(name, @user_name, @ssh_options.merge({:port=>port.to_i}))
          @connection_cache[host.name] = c
          @connection_mutex.synchronize { @connections << {:connection=>c, :host=>host} }
          host.connected=true
          #puts "Connected #{host.name}"
        rescue Exception => e
          host.connect_failed = true
          host.connected=false
          error "Unable to connect to #{host.name}\n#{e.message}"
          @connection_mutex.synchronize {@connections << {:connection=>nil, :host=>host} }
          host.exception=e
        end        
      }
    end
  end
  s = Time.now
  loop do
    l=0
    @connection_mutex.synchronize { l = @connections.length }
    break if l == hosts_to_connect
    sleep(0.1)
    if Time.now - s > total_timeout
      puts "Warning -- total connection time expired"
      puts "Failed to connect:"
      hosts.each do |h|
        unless h.connected
          puts "    #{h.name}" 
          h.connect_failed=true
          # TODO: Need to handle this situations much better. Attempt to kill thread and/or mark connection in cache as unreachable
        end
      end
      break
    end
  end      
end

#error(msg) ⇒ Object



297
298
299
# File 'lib/cluster_bomb/cluster.rb', line 297

def error(msg)
  puts msg
end

#event_loop(task, options = {}) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/cluster_bomb/cluster.rb', line 220

def event_loop(task, options={})
  # Event loop
  condition = Proc.new { |s| s.busy?(true) }
  # Count up non-nil connections
  count = 0
  @connections.each {|c| count +=1 if c[:connection]}
  loop do
    @connections.each do |conn|
      next if conn[:connection].nil? || conn[:completed]
      ex=nil
      busy=true
      begin
        busy = conn[:connection].process(0.1, &condition)
        if @start_time && Time.now - @start_time > @max_time
          # Soft exception here -- stay connected
          conn[:host].exception = Exception.new("Execution time exceeded: #{@max_time}")
          puts "Execution time exceeded: #{@max_time}"
          busy=false
        end
      rescue Exception => e
        # As far as I can tell, if we ever get here, the session is fucked. 
        # Close out the connection and indicate that we want to be reconnected later
        # In general, its upload/download exceptions that get us here. Even bad filenames can do the trick
        puts "#{e.message}"
        host = conn[:host]
        @connection_cache[host.name].close
        @connection_cache[host.name]=nil
        host.connected=false # disconnect
        busy=false
      end
      if !busy
        conn[:completed] = true
        count -=1 
        h = conn[:host]
        if task
          task.call(h)
        elsif options[:echo]
          puts "#{h.name}\n#{h.console}\n"
        end
      end
    end
    break if count <=0
  end
  # Reset these
  @start_time=nil
  @max_time = nil      
end

#mksudo(command) ⇒ Object

Build sudo-fied command. Really only works for bash afaik



188
189
190
# File 'lib/cluster_bomb/cluster.rb', line 188

def mksudo(command)
  "sudo /bin/bash -c #{Shellwords.escape(command)}"
end

#reset!Object



289
290
291
292
# File 'lib/cluster_bomb/cluster.rb', line 289

def reset!
  @connections=[]
  @connected=false  
end

#run(command, options = {}, &task) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/cluster_bomb/cluster.rb', line 192

def run(command, options={}, &task)
  # Execute
  ensure_connected!
  if options[:sudo]
    command=mksudo(command)
  end      
  # puts "Command: #{command}"
  set_run_timer(options)      
  @connections.each do |c|
    next if c[:connection].nil?
    c[:completed]=false
    c[:host].clear! if options[:echo] # Clear out before starting
    c[:connection].exec command do |ch, stream, data|
      c[:host].buffer_console << data
      if stream == :stderr
        c[:host].buffer_stderr << data
      else
        "#{c[:host].name}=> #{data}"
        c[:host].buffer_stdout << data
      end
      puts "#{c[:host].name}::#{data}" if options[:debug]
      print "." if options[:dotty]
    end
  end
  event_loop(task, options)
  @hosts
end

#set_run_timer(options) ⇒ Object



152
153
154
155
156
157
158
159
160
# File 'lib/cluster_bomb/cluster.rb', line 152

def set_run_timer(options)
  if options[:max_run_time]
    @max_time = options[:max_run_time].to_i
    @start_time = Time.now
  else
    @max_time = nil
    @start_time = nil
  end
end

#upload(local, remote, options = {}, &task) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/cluster_bomb/cluster.rb', line 174

def upload(local, remote, options={}, &task)
  opts={:chunk_size=>16384}.merge(options)
  ensure_connected!
  set_run_timer(options)      
  @connections.each do |c|
    next if c[:connection].nil?
    c[:completed]=false      
    c[:connection].scp.upload(local,remote, opts)
  end
  event_loop(task)
  @hosts      
end