Class: Wakame::Action

Inherits:
Object
  • Object
show all
Includes:
AttributeHelper, ThreadImmutable
Defined in:
lib/wakame/action.rb

Constant Summary

Constants included from AttributeHelper

AttributeHelper::CONVERT_CLASSES, AttributeHelper::PRIMITIVE_CLASSES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Methods included from AttributeHelper

#dump_attrs

Instance Attribute Details

#triggerObject (readonly)

Returns the value of attribute trigger.



12
13
14
# File 'lib/wakame/action.rb', line 12

def trigger
  @trigger
end

Instance Method Details

#actor_request(agent_id, path, *args, &blk) ⇒ Object



104
105
106
107
108
109
110
111
# File 'lib/wakame/action.rb', line 104

def actor_request(agent_id, path, *args, &blk)
  request = master.actor_request(agent_id, path, *args)
  if blk
    request.request
    blk.call(request)
  end
  request
end

#agent_monitorObject



18
19
20
# File 'lib/wakame/action.rb', line 18

def agent_monitor
  trigger.agent_monitor
end

#all_subactions_complete?Boolean

Returns:

  • (Boolean)


72
73
74
75
76
77
78
# File 'lib/wakame/action.rb', line 72

def all_subactions_complete?
  subactions.each { |a|
    #Wakame.log.debug("#{a.class}.status=#{a.status}")
    return false unless a.status == :complete && a.all_subactions_complete?
  }
  true
end

#bind_triggered_rule(trigger) ⇒ Object



39
40
41
# File 'lib/wakame/action.rb', line 39

def bind_triggered_rule(trigger)
  @trigger = trigger
end

#flush_subactions(sec = nil) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/wakame/action.rb', line 55

def flush_subactions(sec=nil)
  job_context = trigger.rule_engine.active_jobs[self.job_id]
  return if job_context.nil?
  
  timeout(sec.nil? ? nil : sec) {
    until all_subactions_complete?
      #Wakame.log.debug "#{self.class} all_subactions_complete?=#{all_subactions_complete?}"
      src = notify_queue.deq
      # Exit the current action when a subaction notified exception.
      if src.is_a?(Exception)
        raise src
      end
      #Wakame.log.debug "#{self.class} notified by #{src.class}, all_subactions_complete?=#{all_subactions_complete?}"
    end
  }
end

#masterObject



14
15
16
# File 'lib/wakame/action.rb', line 14

def master
  trigger.master
end

#notesObject



118
119
120
# File 'lib/wakame/action.rb', line 118

def notes
  trigger.rule_engine.active_jobs[self.job_id][:notes]
end

#notify(src = nil) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
# File 'lib/wakame/action.rb', line 84

def notify(src=nil)
  #Wakame.log.debug("#{self.class}.notify() has been called")
  src = self if src.nil?
  if status == :complete && parent_action
    # Escalate the notification to parent if the action is finished.
    parent_action.notify(src)
  else
    notify_queue.clear if notify_queue.size > 0
    notify_queue.enq(src) #if notify_queue.num_waiting > 0
  end
end

#notify_queueObject



80
81
82
# File 'lib/wakame/action.rb', line 80

def notify_queue
  @notify_queue ||= ::Queue.new
end

#on_canceledObject



129
130
# File 'lib/wakame/action.rb', line 129

def on_canceled
end

#on_failedObject



126
127
# File 'lib/wakame/action.rb', line 126

def on_failed
end

#runObject

Raises:

  • (NotImplementedError)


122
123
124
# File 'lib/wakame/action.rb', line 122

def run
  raise NotImplementedError
end

#service_clusterObject



22
23
24
# File 'lib/wakame/action.rb', line 22

def service_cluster
  trigger.service_cluster
end

#status=(status) ⇒ Object



25
26
27
28
29
30
31
32
# File 'lib/wakame/action.rb', line 25

def status=(status)
  if @status != status
    @status = status
    # Notify to observers after updating the attribute
    notify
  end
  @status
end

#subactionsObject



35
36
37
# File 'lib/wakame/action.rb', line 35

def subactions
  @subactions ||= []
end

#sync_actor_request(agent_id, path, *args) ⇒ Object



113
114
115
116
# File 'lib/wakame/action.rb', line 113

def sync_actor_request(agent_id, path, *args)
  request = actor_request(agent_id, path, *args).request
  request.wait
end

#trigger_action(subaction, opts = {}) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/wakame/action.rb', line 43

def trigger_action(subaction, opts={})
  if opts.is_a? Hash
    succ_proc = opts[:success] || opts[:succ]
    fail_proc = opts[:fail]
  end
  subactions << subaction
  subaction.parent_action = self
  #subaction.observers << self
  
  async_trigger_action(subaction, succ_proc, fail_proc)
end

#walk_subactions(&blk) ⇒ Object



97
98
99
100
101
102
# File 'lib/wakame/action.rb', line 97

def walk_subactions(&blk)
  blk.call(self)
  self.subactions.each{ |a|
    a.walk_subactions(&blk)
  }
end