Class: Amazon

Inherits:
Infrastructure show all
Defined in:
lib/cluster/infrastructures/amazon.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Infrastructure

cluster_name, connect, #credentials, #credentials?, current, dns, in_cluster?, #initialize, #machines, #names, #services, sizes, #to_credentials

Methods included from Cluster::Logging

#logger

Constructor Details

This class inherits a constructor from Infrastructure

Class Method Details

.from_sdb_results(res) ⇒ Object



550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
# File 'lib/cluster/infrastructures/amazon.rb', line 550

def from_sdb_results(res)
  res[:items].inject([]) {|m, obj|
    aws_id = obj.keys.first
    attrs = obj[aws_id]
    args = attrs.keys.inject({:aws_id => aws_id}) {|n, attr|
      val = attrs[attr]
      if val.empty?
        n.merge attr => nil
      elsif val.length == 1
        n.merge attr => val.first
      else
        n.merge attr => val
      end
    }
    
    m << args
  }
end

.to_sdb_attributes(args) ⇒ Object



541
542
543
544
545
546
547
548
# File 'lib/cluster/infrastructures/amazon.rb', line 541

def to_sdb_attributes(args)
  attrs = {}
  # FIXME this needs a way to deal with arrays that are empty
  args.each do |k, v|
    v and attrs.merge!(k => Array(v))
  end
  attrs
end

Instance Method Details

#alter_instances!(*iss) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/cluster/infrastructures/amazon.rb', line 181

def alter_instances!(*iss)
  list = iss.empty? ? instances : iss

  for ins in list.flatten
    yield ins if block_given?
    attrs = ins.attributes
    remove = attrs.keys.select {|k| attrs[k].empty? and attrs.delete(k) }
    unless remove.empty?
      sdb.delete_attributes domain, ins.aws_id, remove
    end
    sdb.put_attributes domain, ins.aws_id, ins.attributes, :replace
  end
end

#authorize(ips) ⇒ Object



524
525
526
527
528
# File 'lib/cluster/infrastructures/amazon.rb', line 524

def authorize(ips)
  ips.map {|ip|
    ecc.authorize_security_group_IP_ingress('access', 22, 22, 'tcp', ip) and ip
  }
end

#bucketObject



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/cluster/infrastructures/amazon.rb', line 472

def bucket
  unless @options.cluster_bucket
    puts "#{Cluster::NAME} has not been configured with a bucket for client materials."
    exit 2
  end

  @bucket ||= sss.bucket(@options.cluster_bucket, true)

  unless @bucket
    puts "#{Cluster::NAME} bucket named #{@options.cluster_bucket} cannot be created or accessed."
    exit 2
  else
    @bucket
  end
end

#configureObject



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/cluster/infrastructures/amazon.rb', line 5

def configure
  super
  @@in_cluster = in_cluster?
  @@instances = nil

  if credentials? and credentials.include? 'amazon'
    creds = credentials['amazon']
    @options.key = creds['key']
    @options.secret = creds['secret']
    @options.owner = creds['owner']
    @options.cluster_bucket = creds['cluster_bucket']
    @options.bucket_key = creds['bucket_key']
    @options.cluster_domain = creds['cluster_domain']
    @options.zone = creds['zone']
    @options.volumes = creds['volumes']
  end

  @options.key ||= ENV['AMAZON_ACCESS_KEY_ID'] 
  @options.secret ||= ENV['AMAZON_SECRET_ACCESS_KEY'] 
  @options.owner ||= ENV['AMAZON_OWNER_ID'] 
  @options.cluster_bucket ||= ENV['CLUSTER_BUCKET']
  @options.bucket_key ||= 'cluster_credentials.yml'
  @options.zone ||= ENV['AMAZON_ZONE']
  @options.cluster_domain ||= ENV['CLUSTER_DOMAIN'] || self.class.cluster_name
  @options.volumes ||= {}

  @options.role = (ENV['CLUSTER_ROLE'] or ENV['RAILS_ENV'] or 'production')

  @options.cluster_image_key = 'cluster_images.yml'
  @options.spot_instances = false
  @options.price = false

  OptionParser.new {|o|
    o.banner = "Amazon Infrastructure Options"

    o.on('-k', '--key VAL', "Amazon Access Key ID") do |v|
      @options.key = v
    end

    o.on('-s', '--secret VAL', 'Amazon Access Secret') do |v|
      @options.secret = v
    end

    o.on('-o', '--owenr VAL', 'Amazon User Code') do |v|
      @options.owner = v
    end

    o.on('-d', '--domain VAL', 'Amazon Domain to use') do |v|
      @options.cluster_domain = v
    end

    o.on('-b', '--bucket VAL', 'Cluster Bucket') do |v|
      @options.cluster_bucket = v
    end

    o.on('-f', '--bucket-credentials-file VAL', 'Cluster credentials file location on the bucket.') do |v|
      @options.bucket_key = v
    end

    o.on('--source-bucket VAL', 'Bucket that has the host data.') do |v|
    end

    o.on('-r', '--role VAL', 'Role in which to operate.') do |v|
      @options.role = v
    end

    o.on('-z', '--zone VAL', "Availability Zone") do |v|
      @options.zone = v
    end

    o.on('--spot', "Use Spot Instances") do |v|
      @options.spot_instances = true
    end

    o.on('--price=VAL', 'Maximum price for the spot instance') do |v|
      @options.price = v
    end

  }.parse(@arguments)

  unless @options.key and @options.secret
    $stderr.puts "Amazon Infrastructure cannot communicate without secret and key."
    exit 2
  end

  unless @options.cluster_domain
    $stderr.puts "Amazon Infrastructure needs to know what domain to connect to."
    exit 2
  end
end

#connect_to_active_sdb(params = {}) ⇒ Object



331
332
333
# File 'lib/cluster/infrastructures/amazon.rb', line 331

def connect_to_active_sdb(params = {})
  RightAws::ActiveSdb.establish_connection key, secret, connection_params(params)
end

#connection_params(params = {}) ⇒ Object



402
403
404
405
406
407
408
409
# File 'lib/cluster/infrastructures/amazon.rb', line 402

def connection_params(params = {})
  params = {:multi_thread => true} if params.empty?
  unless params.include?(:logger) 
    params.merge(:logger => logger)
  else
    params
  end
end

#cost(sizes) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/cluster/infrastructures/amazon.rb', line 100

def cost(sizes)
  unless @options.spot_instances
    puts "Cost only works for spot instances currently (ie. supply infrastructure argument of --spot"
    exit 3
  end
  args = {:start_time => (Time.now - (7 * 24 * 60 * 60)),
        :end_time => Time.now,
        :product_description => "Linux/UNIX"}
  sizes.map {|s|
    t = [size_to_instance_type(s)]
    prices = ecc.describe_spot_price_history(args.merge(:instance_types => t))
    p = prices.reduce(0) {|c,p| c + p[:spot_price] } / prices.length
    [s, "%0.3f" % p]
  }
end

#create_data_store(name) ⇒ Object



120
121
122
# File 'lib/cluster/infrastructures/amazon.rb', line 120

def create_data_store(name)
  sdb.create_domain(name)
end

#create_file_store(name) ⇒ Object



116
117
118
# File 'lib/cluster/infrastructures/amazon.rb', line 116

def create_file_store(name)
  sss.bucket(name, true)
end

#credentials_url(seconds = 1200) ⇒ Object



429
430
431
432
# File 'lib/cluster/infrastructures/amazon.rb', line 429

def credentials_url(seconds = 1200)
  keygen = RightAws::S3Generator::Key.new(bucket, @options.bucket_key)
  keygen.get(seconds)
end

#current_instanceObject



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/cluster/infrastructures/amazon.rb', line 267

def current_instance
  unless in_cluster?
    puts "Are we in the cluster?"
    exit 3
  end

  require 'open-uri'
  ec2_id = open('http://169.254.169.254/latest/meta-data/instance-id') {|f|  f.read }
  ins = self.instances.detect {|i| i.ec2_id.eql? ec2_id }

  unless ins
    puts "#{Cluster::NAME} cannot determine the current instance." 
    exit 2
  end

  if @options.current_instance_id
    aws = self.instances.detect {|i| i.aws_id.eql? @options.current_instance_id }
    if aws and aws.aws_id != ins.aws_id
      ins.services = (ins.services + aws.services).uniq
      ins.disabled_services = (ins.disabled_services + aws.disabled_services).uniq
      ins.spot_price = aws.spot_price unless ins.spot_price
      ins.friendly_name = aws.friendly_name unless ins.friendly_name
      sdb.delete_attributes domain, aws.aws_id
      sdb.put_attributes domain, ins.aws_id, ins.attributes, :replace
    end
  end

  ins
end

#domainObject



317
318
319
# File 'lib/cluster/infrastructures/amazon.rb', line 317

def domain
  @options.cluster_domain
end

#domains(reload = false) ⇒ Object



375
376
377
378
379
380
381
# File 'lib/cluster/infrastructures/amazon.rb', line 375

def domains(reload = false)
  @sdb_domains = if !@sdb_domains or reload
                   sdb.list_domains[:domains]
                 else
                   @sdb_domains
                 end
end

#ecc(params = {}) ⇒ Object



365
366
367
368
369
370
371
372
373
# File 'lib/cluster/infrastructures/amazon.rb', line 365

def ecc(params = {})
  return @ec2 if @ec2
  @ec2 = RightAws::Ec2.new(key, secret, connection_params(params))
  unless @ec2
    puts "Amazon cannot connect to EC2"
    exit 3
  end
  @ec2
end

#elb(params = {}) ⇒ Object



355
356
357
358
359
360
361
362
363
# File 'lib/cluster/infrastructures/amazon.rb', line 355

def elb(params = {})
  return @elb if @elb
  @elb = RightAws::ElbInterface.new(key, secret, connection_params(params))
  unless @elb
    puts "Amazon cannot connect to elb"
    exit 3
  end
  @elb
end

#fetch_monitorObject



260
261
262
263
264
265
# File 'lib/cluster/infrastructures/amazon.rb', line 260

def fetch_monitor
  res = sdb.select "select * from #{domain} where entry = 'monitor'"
  return nil if res[:items].empty?
  monitor = self.class.from_sdb_results(res).first
  bucket.get(monitor['key'])
end

#get_image(bits) ⇒ Object



493
494
495
496
497
498
499
500
501
# File 'lib/cluster/infrastructures/amazon.rb', line 493

def get_image(bits)
  @image_file ||= get_image_file
  case bits
  when 32
    @image_file['thirtytwo']
  when 64
    @image_file['sixtyfour']
  end
end

#get_image_fileObject



488
489
490
491
# File 'lib/cluster/infrastructures/amazon.rb', line 488

def get_image_file
  require 'open-uri'
  open(Cluster::IMAGES) {|f| YAML::load(f) }
end

#in_cluster?Boolean

Returns:

  • (Boolean)


195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/cluster/infrastructures/amazon.rb', line 195

def in_cluster?
  return @cluster_check if @cluster_check

  check = false
  begin
    Timeout::timeout(1) do
      begin
        s = TCPSocket.new('169.254.169.254', 80)
        s.close
        check = true
      rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
        # NOP
      end
    end
  rescue Timeout::Error
  end

  @@in_cluster = check
end

#instancesObject



96
97
98
# File 'lib/cluster/infrastructures/amazon.rb', line 96

def instances
  @@instances ||= load_instances
end

#keyObject

The methods below here are typically only used internally, but could also be called by anything that would like to access the Amazon tools directly.



305
306
307
# File 'lib/cluster/infrastructures/amazon.rb', line 305

def key
  @options.key
end

#load_instancesObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/cluster/infrastructures/amazon.rb', line 124

def load_instances
  terminated = []
  iss = ecc.describe_instances.map {|ins|
    if %w(terminated shutting-down).include? ins[:aws_state]
      terminated << ins[:aws_instance_id]
      nil
    else
      AmazonInstance.new(ins)
    end
  }.compact

  begin
    res = sdb.select "select * from #{domain} where entry='machine'"
  rescue RightAws::AwsError
    unless domains.include? domain
      sdb.create_domain domain 
      retry
    end
  end
  sdbs = self.class.from_sdb_results res

  sdbs.each do |sd|
    aid = sd[:aws_id]
    iid = sd['ec2_id']
    if !iid
      started = sd['start_time_sorted'] && Time.parse(sd['start_time_sorted'])
      diff = started && (Time.now - started)
      if diff and diff < (12 * 3600)
        ins = AmazonInstance.new
        ins.set_sdb_attributes sd
        iss.push ins
      else
        $stderr.puts "Cannot find machine #{aid} -- old entry being removed."
        sdb.delete_attributes domain, aid
      end
    else
      if ins = iss.detect {|i| i.id.eql? iid }
        ins.set_sdb_attributes sd
      elsif terminated.include? iid
        $stderr.puts "Removing terminated entry #{iid}"
        sdb.delete_attributes domain, aid
      else
        $stderr.puts "Orphaned cluster record of #{aid}.  (Just started?) [#{sd.inspect}]"
      end
    end
  end

  iss.each do |ins|
    if ins.no_sdb?
      puts "Cannot find cluster registration for #{ins.ec2_id} -- creating."
      sdb.put_attributes domain, ins.aws_id, ins.attributes, :replace
    end
  end

  iss
end

#new_instance(size, services) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/cluster/infrastructures/amazon.rb', line 223

def new_instance(size, services)
  args = { :services => services, :size => size }
  if @options.spot_instances
    unless @options.price
      puts "Amazon Spot instances need a '--price=??' argument"
      exit 3
    end
    args.merge! :spot_price => @options.price
  end
=begin
     type = size_to_type size
  groups = services_to_groups services
  image = type_to_image type
  puts "type #{type} Groups #{groups.inspect} K #{key} I #{image} UD #{user}"
=end
  ins = AmazonInstance.create args
  puts ins.inspect
  ins
end

#optionsObject



503
504
505
# File 'lib/cluster/infrastructures/amazon.rb', line 503

def options
  @options
end

#ownerObject



313
314
315
# File 'lib/cluster/infrastructures/amazon.rb', line 313

def owner
  @options.owner
end

#period(args) ⇒ Object



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/cluster/infrastructures/amazon.rb', line 434

def period(args)
  shots = {}
  ecc.describe_snapshots.each do |shot|
    next unless shot[:aws_owner].to_s == @options.owner.to_s
    next unless shot[:aws_status] == 'completed'
    shot[:started_at] = Time.parse shot[:aws_started_at]
    aid = shot[:aws_volume_id]
    if shots.include? aid
      shots[aid].push shot
    else
      shots.merge! aid => [shot]
    end
  end

  stores = @options.volumes['stores'] || []
  defaults = if @options.volumes.include?('defaults')
               @options.volumes['defaults']
             else
               {}
             end

  shots.keys.each do |k|
    opts = if v = stores.detect {|v| v['aws_id'] == k} 
             defaults.merge v
           else
             defaults
           end
    snap_count = opts['snaps'] || 2
    snap_count = 2 unless snap_count > 1

    ordered = shots[k].sort {|a,b| a[:started_at] <=> b[:started_at] }.reverse
    ordered.slice(snap_count, ordered.length).map {|shot|
      ecc.delete_snapshot shot[:aws_id]
      shot[:aws_id]
    }
  end
end

#query(qry) ⇒ Object



530
531
532
533
534
535
536
537
# File 'lib/cluster/infrastructures/amazon.rb', line 530

def query(qry)
  res = sdb.select qry
  if res
    self.class.from_sdb_results res
  else
    nil
  end
end

#release_classObject



297
298
299
# File 'lib/cluster/infrastructures/amazon.rb', line 297

def release_class
  AmazonRelease
end

#retrieve(key) ⇒ Object



415
416
417
# File 'lib/cluster/infrastructures/amazon.rb', line 415

def retrieve(key)
  bucket.get key
end

#revoke(ips) ⇒ Object



518
519
520
521
522
# File 'lib/cluster/infrastructures/amazon.rb', line 518

def revoke(ips)
  ips.map {|ip|
    ecc.revoke_security_group_IP_ingress('access', 22, 22, 'tcp', ip) and ip
  }
end

#save_credentials(creds, key = nil) ⇒ Object



419
420
421
422
# File 'lib/cluster/infrastructures/amazon.rb', line 419

def save_credentials(creds, key = nil)
  key ||= @options.bucket_key
  bucket.put(key, creds)
end

#save_images(input) ⇒ Object



424
425
426
427
# File 'lib/cluster/infrastructures/amazon.rb', line 424

def save_images(input)
  key ||= @options.cluster_image_key
  bucket.put key, input, {}, 'public-read'
end

#save_monitor(io, key) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/cluster/infrastructures/amazon.rb', line 243

def save_monitor(io, key)
  if bucket.put key, io
    old = sdb.select "select * from #{domain} where entry = 'monitor'"
    monitor = unless old[:items].empty?
                self.class.from_sdb_results(old).first
              else
                { 'aws_id' => UUIDTools::UUID.timestamp_create.to_s,
                  'entry' => 'monitor' }
              end
    monitor.merge! 'updated_at' => Time.now.xmlschema(3),
      'key' => key,
      'bucket' => @cluster_bucket
    sdb.put_attributes domain, monitor['aws_id'], self.class.to_sdb_attributes(monitor), :replace
    monitor
  end
end

#sdb(params = {}) ⇒ Object



321
322
323
324
325
326
327
328
329
# File 'lib/cluster/infrastructures/amazon.rb', line 321

def sdb(params = {})
  return @sdb if @sdb
  @sdb = RightAws::SdbInterface.new(key, secret, connection_params(params))
  unless @sdb
    puts "Amazon cannot connect to SimpleDB"
    exit 3
  end
  @sdb
end

#secretObject



309
310
311
# File 'lib/cluster/infrastructures/amazon.rb', line 309

def secret
  @options.secret
end

#security(groups) ⇒ Object



507
508
509
510
511
512
513
514
515
516
# File 'lib/cluster/infrastructures/amazon.rb', line 507

def security(groups)
  ecc.describe_security_groups(groups).inject({}) {|m,g|
    p = g[:aws_perms].map {|p|
      next unless p[:cidr_ips]
      [p[:cidr_ips], p[:from_port]..p[:to_port]]
    }.compact

    m.merge g[:aws_group_name] => p
  }
end

#size_to_instance_type(size) ⇒ Object



215
216
217
218
219
220
221
# File 'lib/cluster/infrastructures/amazon.rb', line 215

def size_to_instance_type(size)
  unless self.class.sizes.include? size.downcase
    puts "#{Cluster::NAME} does not have a machine size of #{size}\n\tAvailable Sizes: (#{self.class.sizes.join(', ')})"
    exit 2
  end
  AmazonInstance.size_to_type(size)
end

#sqs(params = {}) ⇒ Object



345
346
347
348
349
350
351
352
353
# File 'lib/cluster/infrastructures/amazon.rb', line 345

def sqs(params = {})
  return @sqs if @sqs
  @sqs = RightAws::SqsGen2.new(key, secret, connection_params(params))
  unless @sqs
    puts "Amazon cannot connect to S3"
    exit 3
  end
  @sqs
end

#sss(params = {}) ⇒ Object



335
336
337
338
339
340
341
342
343
# File 'lib/cluster/infrastructures/amazon.rb', line 335

def sss(params = {})
  return @s3 if @s3
  @s3 = RightAws::S3.new(key, secret, connection_params(params))
  unless @s3
    puts "Amazon cannot connect to S3"
    exit 3
  end
  @s3
end

#store(key, io) ⇒ Object



411
412
413
# File 'lib/cluster/infrastructures/amazon.rb', line 411

def store(key, io)
  bucket.put(key, io)
end