Class: Wakame::RuleEngine
- Inherits:
-
Object
- Object
- Wakame::RuleEngine
- Defined in:
- lib/wakame/rule_engine.rb
Constant Summary collapse
- FORWARD_ATTRS =
[:command_queue, :agent_monitor, :service_cluster, :master]
Instance Attribute Summary collapse
-
#active_jobs ⇒ Object
readonly
Returns the value of attribute active_jobs.
-
#triggers ⇒ Object
readonly
Returns the value of attribute triggers.
Instance Method Summary collapse
- #agent_monitor ⇒ Object
- #cancel_action(job_id) ⇒ Object
- #command_queue ⇒ Object
- #create_job_context(trigger, root_action) ⇒ Object
-
#initialize(service_cluster, &blk) ⇒ RuleEngine
constructor
A new instance of RuleEngine.
- #master ⇒ Object
- #register_trigger(trigger) ⇒ Object
- #run_action(action) ⇒ Object
- #service_cluster ⇒ Object
Constructor Details
#initialize(service_cluster, &blk) ⇒ RuleEngine
Returns a new instance of RuleEngine.
31 32 33 34 35 36 37 38 39 |
# File 'lib/wakame/rule_engine.rb', line 31 def initialize(service_cluster, &blk) @service_cluster = service_cluster @triggers = [] @active_jobs = {} @job_history = [] @global_lock = nil instance_eval(&blk) if blk end |
Instance Attribute Details
#active_jobs ⇒ Object (readonly)
Returns the value of attribute active_jobs.
13 14 15 |
# File 'lib/wakame/rule_engine.rb', line 13 def active_jobs @active_jobs end |
#triggers ⇒ Object (readonly)
Returns the value of attribute triggers.
13 14 15 |
# File 'lib/wakame/rule_engine.rb', line 13 def triggers @triggers end |
Instance Method Details
#agent_monitor ⇒ Object
23 24 25 |
# File 'lib/wakame/rule_engine.rb', line 23 def agent_monitor master.agent_monitor end |
#cancel_action(job_id) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/wakame/rule_engine.rb', line 62 def cancel_action(job_id) job_context = @active_jobs[job_id] if job_context.nil? Wakame.log.warn("JOB ID #{job_id} was not running.") return end return if job_context[:complete_at] root_act = job_context[:root_action] walk_subactions = proc { |a| if a.status == :running && (a.target_thread && a.target_thread.alive?) && a.target_thread != Thread.current Wakame.log.debug "Raising CancelBroadcast exception: #{a.class} #{a.target_thread}(#{a.target_thread.status}), current=#{Thread.current}" # Broadcast the special exception to all a.target_thread.raise(CancelBroadcast, "It's broadcasted from #{a.class}") # IMPORTANT: Ensure the worker thread to handle the exception. #Thread.pass end a.subactions.each { |n| walk_subactions.call(n) } } begin Thread.critical = true walk_subactions.call(root_act) ensure Thread.critical = false # IMPORTANT: Ensure the worker thread to handle the exception. Thread.pass end end |
#command_queue ⇒ Object
19 20 21 |
# File 'lib/wakame/rule_engine.rb', line 19 def command_queue master.command_queue end |
#create_job_context(trigger, root_action) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/wakame/rule_engine.rb', line 48 def create_job_context(trigger, root_action) root_action.job_id = job_id = Wakame.gen_id @active_jobs[job_id] = { :job_id=>job_id, :src_trigger=>trigger, :create_at=>Time.now, :start_at=>nil, :complete_at=>nil, :root_action=>root_action, :notes=>{} } end |
#master ⇒ Object
15 16 17 |
# File 'lib/wakame/rule_engine.rb', line 15 def master service_cluster.master end |
#register_trigger(trigger) ⇒ Object
41 42 43 44 45 46 |
# File 'lib/wakame/rule_engine.rb', line 41 def register_trigger(trigger) Wakame.log.debug("Registering trigger #{trigger.class}") trigger.bind_engine(self) trigger.register_hooks @triggers << trigger end |
#run_action(action) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/wakame/rule_engine.rb', line 96 def run_action(action) job_context = @active_jobs[action.job_id] raise "The job session is killed.: job_id=#{action.job_id}" if job_context.nil? if action.acquire_lock if @global_lock.nil? @global_lock = action.job_id else unless @global_lock == action.job_id raise GlobalLockError, "Global Lock is already acquired by the JobID: #{@global_lock}" end end end EM.next_tick { begin if job_context[:start_at].nil? job_context[:start_at] = Time.new ED.fire_event(Event::JobStart.new(action.job_id)) end EM.defer proc { res = nil begin action.bind_thread(Thread.current) action.status = :running Wakame.log.debug("Start action : #{action.class.to_s} triggered by [#{action.trigger.class}]") ED.fire_event(Event::ActionStart.new(action)) begin action.run action.completion_status = :succeeded Wakame.log.debug("Complete action : #{action.class.to_s}") ED.fire_event(Event::ActionComplete.new(action)) end rescue CancelBroadcast => e Wakame.log.info("Received cancel signal: #{e}") action.completion_status = :canceled begin action.on_canceled rescue => e Wakame.log.error(e) end ED.fire_event(Event::ActionFailed.new(action, e)) res = e rescue => e Wakame.log.debug("Failed action : #{action.class.to_s} due to #{e}") Wakame.log.error(e) action.completion_status = :failed begin action.on_failed rescue => e Wakame.log.error(e) end ED.fire_event(Event::ActionFailed.new(action, e)) # Escalate the cancelation event to parents. unless action.parent_action.nil? action.parent_action.notify(e) end # Force to cancel the current job when the root action ignored the elevated exception. if action === job_context[:root_action] Wakame.log.warn("The escalated exception (#{e.class}) has reached to the root action (#{action.class}). Forcing to cancel the current job #{job_context[:job_id]}") cancel_action(job_context[:job_id]) #rescue Wakame.log.error($!) end res = e ensure action.status = :complete action.bind_thread(nil) end res }, proc { |res| unless @active_jobs.has_key?(job_context[:job_id]) next end = [] job_context[:root_action].walk_subactions {|a| << a } Wakame.log.debug(.collect{|a| {a.class.to_s=>a.status}}.inspect) if res.is_a?(Exception) job_context[:exception]=res end if .all? { |act| act.status == :complete } if .all? { |act| act.completion_status == :succeeded } ED.fire_event(Event::JobComplete.new(action.job_id)) else ED.fire_event(Event::JobFailed.new(action.job_id, res)) end job_context[:complete_at]=Time.now @job_history << job_context @active_jobs.delete(job_context[:job_id]) @global_lock = nil end } rescue => e Wakame.log.error(e) end } end |
#service_cluster ⇒ Object
27 28 29 |
# File 'lib/wakame/rule_engine.rb', line 27 def service_cluster @service_cluster end |