Class: Wakame::Action

Inherits:
Object
  • Object
show all
Includes:
AttributeHelper, ThreadImmutable
Defined in:
lib/wakame/action.rb

Defined Under Namespace

Classes: ProcAction

Constant Summary

Constants included from AttributeHelper

AttributeHelper::CLASS_TYPE_KEY, AttributeHelper::CONVERT_CLASSES, AttributeHelper::PRIMITIVE_CLASSES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Methods included from AttributeHelper

#dump_attrs, #retrieve_attr_attribute

Instance Attribute Details

#action_managerObject

Returns the value of attribute action_manager.



11
12
13
# File 'lib/wakame/action.rb', line 11

def action_manager
  @action_manager
end

Instance Method Details

#acquire_lock(&blk) ⇒ Object

Set the lock flags to resources



128
129
130
131
132
133
134
135
136
137
# File 'lib/wakame/action.rb', line 128

def acquire_lock(&blk)
  StatusDB.barrier {
    reslist = []
    blk.call(reslist)
    reslist.flatten!
    reslist.each {|r| action_manager.lock_queue.set(r.to_s, self.job_id) }
  }
  
  action_manager.lock_queue.wait(self.job_id)
end

#actor_request(agent_id, path, *args, &blk) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/wakame/action.rb', line 109

def actor_request(agent_id, path, *args, &blk)
  request = master.actor_request(agent_id, path, *args)
  if blk
    request.request
    blk.call(request)
  end
  request
end

#agent_monitorObject



17
18
19
# File 'lib/wakame/action.rb', line 17

def agent_monitor
  master.agent_monitor
end

#all_subactions_complete?Boolean

Returns:

  • (Boolean)


76
77
78
79
80
81
82
# File 'lib/wakame/action.rb', line 76

def all_subactions_complete?
  subactions.each { |a|
    #Wakame.log.debug("#{a.class}.status=#{a.status}")
    return false unless a.status == :complete && a.all_subactions_complete?
  }
  true
end

#flush_subactions(sec = 60*30) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/wakame/action.rb', line 57

def flush_subactions(sec=60*30)
  job_context = action_manager.active_jobs[self.job_id]
  return if job_context.nil?
  
  timeout(sec) {
    until all_subactions_complete?
      #Wakame.log.debug "#{self.class} all_subactions_complete?=#{all_subactions_complete?}"
      Thread.pass
      src = notify_queue.deq
      # Exit the current action when a subaction notified exception.
      if src.is_a?(Exception)
        raise src
      end
      #Wakame.log.debug "#{self.class} notified by #{src.class}, all_subactions_complete?=#{all_subactions_complete?}"
    end
  }
end

#masterObject



13
14
15
# File 'lib/wakame/action.rb', line 13

def master
  action_manager.master
end

#notesObject



123
124
125
# File 'lib/wakame/action.rb', line 123

def notes
  action_manager.active_jobs[self.job_id][:notes]
end

#notify(src = nil) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/wakame/action.rb', line 89

def notify(src=nil)
  #Wakame.log.debug("#{self.class}.notify() has been called")
  src = self if src.nil?
  if status == :complete && parent_action
    # Escalate the notification to parent if the action is finished.
    parent_action.notify(src)
  else
    notify_queue.clear if notify_queue.size > 0
    notify_queue.enq(src) #if notify_queue.num_waiting > 0
  end
end

#on_canceledObject



147
148
# File 'lib/wakame/action.rb', line 147

def on_canceled
end

#on_failedObject



144
145
# File 'lib/wakame/action.rb', line 144

def on_failed
end

#runObject

Raises:

  • (NotImplementedError)


140
141
142
# File 'lib/wakame/action.rb', line 140

def run
  raise NotImplementedError
end

#service_clusterObject Also known as: cluster

Tentative utility method for



22
23
24
25
26
27
# File 'lib/wakame/action.rb', line 22

def service_cluster
  cluster_id = master.cluster_manager.clusters.first
  raise "There is no cluster loaded" if cluster_id.nil?

  Service::ServiceCluster.find(cluster_id)
end

#status=(status) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/wakame/action.rb', line 30

def status=(status)
  if @status != status
    @status = status
    # Notify to observers after updating the attribute
    notify
  end
  @status
end

#subactionsObject



40
41
42
# File 'lib/wakame/action.rb', line 40

def subactions
  @subactions ||= []
end

#sync_actor_request(agent_id, path, *args) ⇒ Object



118
119
120
121
# File 'lib/wakame/action.rb', line 118

def sync_actor_request(agent_id, path, *args)
  request = actor_request(agent_id, path, *args).request
  request.wait
end

#trigger_action(subaction = nil, &blk) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/wakame/action.rb', line 44

def trigger_action(subaction=nil, &blk)
  if blk 
    subaction = ProcAction.new(blk)
  end

  subactions << subaction
  subaction.parent_action = self
  subaction.job_id = self.job_id
  subaction.action_manager = self.action_manager
  
  action_manager.run_action(subaction)
end

#walk_subactions(&blk) ⇒ Object

Recursively iterate the sub action descendants.



102
103
104
105
106
107
# File 'lib/wakame/action.rb', line 102

def walk_subactions(&blk)
  blk.call(self)
  self.subactions.each{ |a|
    a.walk_subactions(&blk)
  }
end