Class: Wakame::Service::ServiceCluster

Inherits:
Wakame::StatusDB::Model show all
Includes:
ThreadImmutable
Defined in:
lib/wakame/service.rb

Constant Summary collapse

STATUS_OFFLINE =
0
STATUS_ONLINE =
1
STATUS_PARTIAL_ONLINE =
2
STATUS_FROZEN =
3
STATUS_UNFROZEN =
4

Constants included from AttributeHelper

AttributeHelper::CLASS_TYPE_KEY, AttributeHelper::CONVERT_CLASSES, AttributeHelper::PRIMITIVE_CLASSES

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Methods inherited from Wakame::StatusDB::Model

#delete, #dirty?, inherited, #new_record?, #on_after_delete, #on_after_load, #on_before_delete, #on_before_load, #reload, #save

Methods included from AttributeHelper

#dump_attrs, #retrieve_attr_attribute

Instance Attribute Details

#trigger_setObject (readonly)

Returns the value of attribute trigger_set.



301
302
303
# File 'lib/wakame/service.rb', line 301

def trigger_set
  @trigger_set
end

Class Method Details

.id(name) ⇒ Object



303
304
305
306
# File 'lib/wakame/service.rb', line 303

def self.id(name)
  require 'digest/sha1'
  Digest::SHA1.hexdigest(name)
end

Instance Method Details

#add_cloud_host(&blk) ⇒ Object



523
524
525
526
527
528
529
530
531
532
533
534
535
# File 'lib/wakame/service.rb', line 523

def add_cloud_host(&blk)
  check_freeze
  h = CloudHost.new
  h.cluster_id = self.id
  self.cloud_hosts[h.id]=1

  blk.call(h) if blk

  h.save
  self.save

  h
end

#add_resource(resource, &blk) ⇒ Object



364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/wakame/service.rb', line 364

def add_resource(resource, &blk)
  check_freeze
  if resource.is_a?(Class) && resource <= Resource
    resource = resource.new
  elsif resource.is_a? Resource
  else
    raise ArgumentError
  end
  raise "Duplicate resource registration: #{resource.class}" if self.resources.has_key? resource.id

  blk.call(resource) if blk

  resources[resource.id]=1
  self.dg.add_object(resource)

  resource.save
  self.save
  self.dg.save
  resource
end

#agentsObject



342
343
344
345
346
347
348
349
# File 'lib/wakame/service.rb', line 342

def agents
  res={}
  cloud_hosts.keys.collect { |cloud_host_id|
    h = CloudHost.find(cloud_host_id)
    res[cloud_host_id]=h.agent_id if h.mapped?
  }
  res
end

#define_triggers(&blk) ⇒ Object



313
314
315
316
# File 'lib/wakame/service.rb', line 313

def define_triggers(&blk)
  @trigger_set ||= TriggerSet.new(self.id)
  blk.call(@trigger_set)
end

#destroy(svc_id) ⇒ Object

Create service instance objects which will be equivalent with the number min_instance. The agents are not assigned at this point. def launch

self.resources.keys.each { |res_id|
  res = Resource.find(res_id)
  count = instance_count(res.class)
  if res.min_instances > count
    (res.min_instances - count).times {
      propagate(res.class)
    }
  end
}

end thread_immutable_methods :launch



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/wakame/service.rb', line 429

def destroy(svc_id)
  check_freeze
  raise("Unknown service instance : #{svc_id}") unless self.services.has_key?(svc_id)
  svc = ServiceInstance.find(svc_id)
  svc.unbind_cluster
  self.services.delete(svc.id)
  old_host = svc.unbind_cloud_host

  if old_host
    Wakame.log.debug("#{svc.resource.class}(#{svc.id}) has been destroied from Host #{old_host.inspect}")
  else
    Wakame.log.debug("#{svc.resource.class}(#{svc.id}) has been destroied.")
  end

  svc.delete
  self.save

  ED.fire_event(Event::ServiceDestroied.new(svc_id))
end

#dgObject



351
352
353
354
355
356
357
358
359
360
361
# File 'lib/wakame/service.rb', line 351

def dg
  unless self.dg_id.nil?
    graph = DependencyGraph.find(self.dg_id)
  else
    graph = DependencyGraph.new
    graph.save
    self.dg_id = graph.id
    self.save
  end
  graph
end

#each_instance(filter_resource = nil, &blk) ⇒ Object

Iterate the service instances in this cluster

The first argument is used for filtering only specified resource instances. Iterated instance objects are passed to the block when it is given. The return value is an array contanins registered service instance objects (filtered).



572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
# File 'lib/wakame/service.rb', line 572

def each_instance(filter_resource=nil, &blk)
  filter_resource = case filter_resource 
                    when Resource
                      filter_resource.class
                    when String
                      Util.build_const(filter_resource)
                    when Module, NilClass
                      filter_resource
                    else
                      raise ArgumentError, "The first argument has to be in form of NilClass, Resource, String or Module: #{filter_resource.class}"
                    end

  filter_ids = []

  unless filter_resource.nil?
    filter_ids = self.resources.keys.find_all { |resid|
      Resource.find(resid).kind_of?(filter_resource)
    }
    return [] if filter_ids.empty?
  end
  
  ary = self.services.keys.collect {|k| ServiceInstance.find(k) }
  if filter_resource.nil?
  else
    ary = ary.find_all{|v| filter_ids.member?(v.resource.id) }
  end

  ary.each {|v| blk.call(v) } if block_given?
  ary
end

#find_service(svc_id) ⇒ Object



553
554
555
556
# File 'lib/wakame/service.rb', line 553

def find_service(svc_id)
  raise "The service ID #{svc_id} is not registered to this cluster \"#{self.name}\"" unless self.services.has_key? svc_id
  ServiceInstance.find(svc_id) || raise("The service ID #{svc_id} is registered. but not in the database.")
end

#has_instance?(svc_id) ⇒ Boolean

Returns:

  • (Boolean)


406
407
408
# File 'lib/wakame/service.rb', line 406

def has_instance?(svc_id)
  self.services.has_key? svc_id
end

#idObject



308
309
310
311
# File 'lib/wakame/service.rb', line 308

def id
  raise "Cluster name is not set yes" if self.name.nil?
  self.class.id(self.name)
end

#instance_count(resource = nil) ⇒ Object



558
559
560
561
562
563
564
565
566
# File 'lib/wakame/service.rb', line 558

def instance_count(resource=nil)
  return self.services.size if resource.nil?

  c = 0
  each_instance(resource) { |svc|
    c += 1
  }
  c
end

#mapped_agent?(agent_id) ⇒ Boolean

Returns:

  • (Boolean)


335
336
337
338
339
340
# File 'lib/wakame/service.rb', line 335

def mapped_agent?(agent_id)
  cloud_hosts.keys.any? { |cloud_host_id|
    h = CloudHost.find(cloud_host_id)
    h.mapped? && h.agent_id == agent_id
  }
end

#propagate_resource(resource, cloud_host_id = nil, force = false) ⇒ Object Also known as: propagate

def propagate(resource, force=false)

Raises:

  • (ArgumentError)


452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/wakame/service.rb', line 452

def propagate_resource(resource, cloud_host_id=nil, force=false)
  check_freeze
  res_id = Resource.id(resource)
  res_obj = (self.resources.has_key?(res_id) && Resource.find(res_id)) || raise("Unregistered resource: #{resource.to_s}")
  raise ArgumentError if res_obj.require_agent && cloud_host_id.nil?

  if force == false
    instnum = instance_count(res_obj)
    if instnum >= res_obj.max_instances
      raise ServicePropagationError, "#{res_obj.class} has been reached to max_instance limit: max=#{res_obj.max_instances}" 
    end
  end
  
  svc = ServiceInstance.new
  svc.bind_cluster(self)
  svc.bind_resource(res_obj)

  # cloud_host_id must be set when the resource is placed on agent.
  if res_obj.require_agent
    host = CloudHost.find(cloud_host_id) || raise("#{self.class}: Unknown CloudHost ID: #{cloud_host_id}")
    svc.bind_cloud_host(host)
  end

  self.services[svc.id]=1

  svc.save
  self.save

  ED.fire_event(Event::ServicePropagated.new(svc.id))
  svc
end

#propagate_service(svc_id, cloud_host_id = nil, force = false) ⇒ Object



486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# File 'lib/wakame/service.rb', line 486

def propagate_service(svc_id, cloud_host_id=nil, force=false)
  check_freeze
  src_svc = (self.services.has_key?(svc_id) && ServiceInstance.find(svc_id)) || raise("Unregistered service: #{svc_id.to_s}")
  res_obj = src_svc.resource

  if force == false
    instnum = instance_count(res_obj)
    if instnum >= res_obj.max_instances
      raise ServicePropagationError, "#{res_obj.class} has been reached to max_instance limit: max=#{res_obj.max_instances}" 
    end
  end
  
  svc = ServiceInstance.new
  svc.bind_cluster(self)
  svc.bind_resource(res_obj)

  if res_obj.require_agent
    if cloud_host_id
      host = CloudHost.find(cloud_host_id) || raise("#{self.class}: Unknown Host ID: #{cloud_host_id}")
    else
      host = add_cloud_host { |h|
        h.vm_attr = src_svc.cloud_host.vm_attr.dup
      }
    end
    svc.bind_cloud_host(host)
  end

  self.services[svc.id]=1

  svc.save
  self.save

  ED.fire_event(Event::ServicePropagated.new(svc.id))
  svc
end

#propertiesObject



619
620
621
# File 'lib/wakame/service.rb', line 619

def properties
  self.resources
end

#remove_cloud_host(cloud_host_id) ⇒ Object



537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
# File 'lib/wakame/service.rb', line 537

def remove_cloud_host(cloud_host_id)
  check_freeze
  if self.cloud_hosts.has_key?(cloud_host_id)
    self.cloud_hosts.delete(cloud_host_id)
  
    self.save
  end

  begin
    ch = CloudHost.find(cloud_host_id)
    ch.unmap_agent
    ch.delete
  rescue => e
  end
end

#resetObject



324
325
326
327
328
329
330
331
332
333
# File 'lib/wakame/service.rb', line 324

def reset
  check_freeze
  services.clear
  resources.clear
  cloud_hosts.clear
  template_vm_attr.clear
  advertised_amqp_servers = nil
  @status = self.class.attr_attributes[:status][:default]
  @status_changed_at = Time.now
end

#set_dependency(res_name1, res_name2) ⇒ Object

Set dependency between two resources.



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/wakame/service.rb', line 387

def set_dependency(res_name1, res_name2)
  check_freeze
  validate_arg = proc {|o|
    o = Util.build_const(o) if o.is_a? String
    raise ArgumentError unless o.is_a?(Class) && o <= Resource
    raise "This is not a member of this cluster \"#{self.class}\": #{o}" unless resources.member?(o.id)
    raise "Unknown resource object: #{o}" unless Resource.exists?(o.id)
    o
  }
  
  res_name1 = validate_arg.call(res_name1)
  res_name2 = validate_arg.call(res_name2)
  
  return if res_name1.id == res_name2.id

  self.dg.set_dependency(res_name1, res_name2)
end

#shutdownObject



410
411
# File 'lib/wakame/service.rb', line 410

def shutdown
end

#sizeObject



615
616
617
# File 'lib/wakame/service.rb', line 615

def size
  self.dg.size
end

#template_vm_specObject



318
319
320
321
322
# File 'lib/wakame/service.rb', line 318

def template_vm_spec
  spec = VmSpec.current
  spec.table = self.template_vm_attr
  spec
end

#update_cluster_statusObject



625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
# File 'lib/wakame/service.rb', line 625

def update_cluster_status
  onlines = []
  all_offline = false

  onlines = self.each_instance.select { |i|
    i.monitor_status == Service::STATUS_ONLINE
  }
  all_offline = self.each_instance.all? { |i|
    i.monitor_status == Service::STATUS_OFFLINE
  }
  #Wakame.log.debug "online instances: #{onlines.size}, assigned instances: #{self.instances.size}"

  prev_status = self.status
  if self.instances.size == 0 || all_offline
    self.update_status(Service::ServiceCluster::STATUS_OFFLINE)
  elsif onlines.size == self.instances.size
    self.update_status(Service::ServiceCluster::STATUS_ONLINE)
  elsif onlines.size > 0
    self.update_status(Service::ServiceCluster::STATUS_PARTIAL_ONLINE)
  end

end

#update_freeze_status(new_status) ⇒ Object



650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
# File 'lib/wakame/service.rb', line 650

def update_freeze_status(new_status)
  raise "Invalid freeze status: #{new_status}" unless [STATUS_FROZEN, STATUS_UNFROZEN].member?(new_status)
  if @freeze_status != new_status
    @freeze_status = new_status

    self.save

    case @freeze_status 
    when STATUS_FROZEN
      ED.fire_event(Event::ClusterFrozen.new(id))
    when STATUS_UNFROZEN
      ED.fire_event(Event::ClusterUnfrozen.new(id))
    end
  end
end

#update_status(new_status) ⇒ Object



603
604
605
606
607
608
609
610
611
612
# File 'lib/wakame/service.rb', line 603

def update_status(new_status)
  if @status != new_status
    @status = new_status
    @status_changed_at = Time.now

    self.save

    ED.fire_event(Event::ClusterStatusChanged.new(id, new_status))
  end
end