Class: OFlow::Task
- Inherits:
-
Object
- Object
- OFlow::Task
- Includes:
- HasErrorHandler, HasLog
- Defined in:
- lib/oflow/task.rb
Overview
The Task class provides the asynchronous functionality for the system. Each Task has it’s own thread and receives requests as an operation and Box of data by the receive() method. The request is put on a queue and popped off one at a time and handed to an Actor associatesd with the Task.
Defined Under Namespace
Classes: Request
Constant Summary collapse
- STARTING =
value of @state that indicates the Task is being created.
0
- STOPPED =
value of @state that indicates the Task is not currently processing requests
1
- RUNNING =
value of @state that indicates the Task is currently ready to process requests
2
- CLOSING =
value of @state that indicates the Task is shutting down
3
- BLOCKED =
value of @state that indicates the Task is not receiving new requests.
4
- STEP =
value of @state that indicates the Task is processing one request and will stop after that processing is complete
5
Instance Attribute Summary collapse
-
#actor ⇒ Object
readonly
the Actor.
-
#bounds ⇒ Object
Returns the value of attribute bounds.
-
#color ⇒ Object
Returns the value of attribute color.
-
#name ⇒ Object
readonly
The name.
-
#shape ⇒ Object
Returns the value of attribute shape.
-
#state ⇒ Object
The current processing state of the Task.
Instance Method Summary collapse
- #_check_link(lnk, errors) ⇒ Object
- #_validation_errors ⇒ Object
-
#backed_up ⇒ Fixnum
Returns a score indicating how backed up the queue is.
-
#busy? ⇒ true|false
Returns the true if any requests are queued or a request is being processed.
-
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String that describes the Task.
-
#error_handler ⇒ Task
Returns an error handler Task by looking for that Task as an attribute and then in the parent Flow.
-
#find_link(label) ⇒ Link
Attempts to find the Link identified by the label.
-
#flush ⇒ Object
Waits for all processing to complete before returning.
-
#full_name ⇒ String
Similar to a full file path.
- #has_input(op) ⇒ Object
- #has_links? ⇒ Boolean
-
#initialize(flow, name, actor_class, options = {}) ⇒ Task
constructor
A Task is initialized by specifying a class to create an instance of.
-
#inputs ⇒ Object
The expected inputs the Task supports or nil if not implemented.
- #link(label, target, op, flow_name = nil) ⇒ Object
-
#links ⇒ Hash
Returns the Links.
-
#log ⇒ Task
Returns a log Task by looking for that Task as an attribute and then in the parent Flow.
-
#max_queue_count ⇒ NilClass|Fixnum
Returns the maximum number of requests allowed on the normal processing queue.
-
#outputs ⇒ Object
The expected outputs the Task supports or nil if not implemented.
-
#proc_count ⇒ Fixnum
Returns the total number of requested processed.
-
#queue_count ⇒ Fixnum
Returns the number of requests on the queue.
-
#receive(op, box) ⇒ Object
Creates a Request and adds it to the queue.
-
#request_timeout ⇒ Float
Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.
-
#resolve_all_links ⇒ Object
Resolves all links.
-
#resolve_link(label) ⇒ Link
Attempts to find and resolve the Link identified by the label.
-
#set_link_target(lnk) ⇒ Object
Sets the target Task for a Link.
- #set_option(key, value) ⇒ Object
-
#set_options(options) ⇒ Object
Processes the initialize() options.
-
#ship(dest, box) ⇒ Object
Sends a message to another Task or Flow.
-
#shutdown(flush_first = false) ⇒ Object
Closes the Task by exiting the processing thread.
-
#start ⇒ Object
Restarts the Task’s processing thread.
-
#state_string ⇒ String
Returns the state of the Task as a String.
-
#step(max_wait = 5) ⇒ Object
Causes the Task to process one request and then stop.
-
#stop ⇒ Object
Causes the Actor to stop processing any more requests after the current request has finished.
-
#wakeup ⇒ Object
Wakes up the Task if it has been stopped or if Env.shutdown() has been called.
Methods included from HasLog
#debug, #debug?, #error, #error?, #fatal, #info, #info?, #log=, #log_msg, #warn, #warn?
Methods included from HasErrorHandler
#error_handler=, #handle_error
Constructor Details
#initialize(flow, name, actor_class, options = {}) ⇒ Task
A Task is initialized by specifying a class to create an instance of.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/oflow/task.rb', line 44 def initialize(flow, name, actor_class, ={}) @flow = flow @name = name.to_sym @state = STARTING @bounds = nil # [x, y, w, h] as fixnums @color = nil # [r, g, b] as floats @shape = nil # Rectangle is the default @queue = [] @req_mutex = Mutex.new() @req_thread = nil @step_thread = nil @waiting_thread = nil @req_timeout = 0.0 @max_queue_count = nil @current_req = nil @proc_cnt = 0 @loop = nil @links = {} @log = nil @error_handler = nil () info("Creating actor #{actor_class} with options #{}.") @actor = actor_class.new(self, ) raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform) @state = .fetch(:state, RUNNING) return unless @actor.with_own_thread() @loop = Thread.start(self) do |me| Thread.current[:name] = me.full_name() while CLOSING != @state begin if RUNNING == @state || STEP == @state || BLOCKED == @state req = nil if @queue.empty? @waiting_thread.wakeup() unless @waiting_thread.nil? sleep(1.0) next end @req_mutex.synchronize { req = @queue.pop() } @req_thread.wakeup() unless @req_thread.nil? @current_req = req begin if debug? debug("perform(#{req.op}, #{req.box.nil? ? '<nil>' : req.box})") else info("perform(#{req.op})") end @actor.perform(req.op, req.box) unless req.nil? rescue Exception => e handle_error(e) end @proc_cnt += 1 @current_req = nil if STEP == @state @step_thread.wakeup() unless @step_thread.nil? @state = STOPPED end elsif STOPPED == @state sleep(1.0) end rescue Exception => e puts "*** #{full_name} #{e.class}: #{e.}" @current_req = nil # TBD Env.rescue(e) end end end end |
Instance Attribute Details
#actor ⇒ Object (readonly)
the Actor
37 38 39 |
# File 'lib/oflow/task.rb', line 37 def actor @actor end |
#bounds ⇒ Object
Returns the value of attribute bounds.
16 17 18 |
# File 'lib/oflow/task.rb', line 16 def bounds @bounds end |
#color ⇒ Object
Returns the value of attribute color.
18 19 20 |
# File 'lib/oflow/task.rb', line 18 def color @color end |
#name ⇒ Object (readonly)
The name.
14 15 16 |
# File 'lib/oflow/task.rb', line 14 def name @name end |
#shape ⇒ Object
Returns the value of attribute shape.
17 18 19 |
# File 'lib/oflow/task.rb', line 17 def shape @shape end |
#state ⇒ Object
The current processing state of the Task
35 36 37 |
# File 'lib/oflow/task.rb', line 35 def state @state end |
Instance Method Details
#_check_link(lnk, errors) ⇒ Object
443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
# File 'lib/oflow/task.rb', line 443 def _check_link(lnk, errors) if lnk.target.nil? errors << ValidateError::Problem.new(full_name, ValidateError::Problem::LINK_ERROR, "Failed to find task '#{lnk.target_name}'.") return end unless lnk.target.has_input(lnk.op) errors << ValidateError::Problem.new(full_name, ValidateError::Problem::INPUT_ERROR, "'#{lnk.op}' not allowed on '#{lnk.target.full_name}'.") return end # TBD # Verify target has link.op as input if input is specified. end |
#_validation_errors ⇒ Object
421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/oflow/task.rb', line 421 def _validation_errors() errors = [] @links.each_value { |lnk| _check_link(lnk, errors) } unless (outs = @actor.outputs()).nil? outs.each do |spec| if find_link(spec.dest).nil? errors << ValidateError::Problem.new(full_name, ValidateError::Problem::MISSING_ERROR, "Missing link for '#{spec.dest}'.") end end end errors end |
#backed_up ⇒ Fixnum
Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector.
230 231 232 233 234 235 236 237 238 239 |
# File 'lib/oflow/task.rb', line 230 def backed_up() cnt = @queue.size() + (@current_req.nil? ? 0 : 1) return 0 if 0 == cnt if @max_queue_count.nil? || 0 == @max_queue_count cnt = 80 if 80 < cnt cnt else cnt * 100 / @max_queue_count end end |
#busy? ⇒ true|false
Returns the true if any requests are queued or a request is being processed.
243 244 245 |
# File 'lib/oflow/task.rb', line 243 def busy?() !@current_req.nil? || !@queue.empty? || @actor.busy? end |
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String that describes the Task.
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/oflow/task.rb', line 186 def describe(detail=0, indent=0) i = ' ' * indent lines = ["#{i}#{name} (#{actor.class}) {"] @links.each { |local,link| if link.flow_name.nil? lines << " #{i}#{local} => :#{link.target_name}:#{link.op}" else lines << " #{i}#{local} => #{link.flow_name}:#{link.target_name}:#{link.op}" end } if 1 <= detail lines << " #{i}queued: #{queue_count()} (#{busy? ? 'busy' : 'idle'})" lines << " #{i}state: #{state_string()}" @actor..each do |key,value| lines << " #{i}#{key} = #{value} (#{value.class})" end if 2 <= detail lines << " #{i}processing: #{@current_req.describe(detail)}" unless @current_req.nil? # Copy queue so it doesn't change while working with it and don't # block the queue. It is okay for the requests to be stale as it is # for a display that will be out of date as soon as it is displayed. reqs = [] @req_mutex.synchronize { reqs = Array.new(@queue) } lines << " #{i}queue:" reqs.each do |req| lines << " #{i}#{req.describe(detail)}" end end end lines << i + "}" lines.join("\n") end |
#error_handler ⇒ Task
Returns an error handler Task by looking for that Task as an attribute and then in the parent Flow.
157 158 159 160 |
# File 'lib/oflow/task.rb', line 157 def error_handler() return @error_handler unless @error_handler.nil? @flow.error_handler() end |
#find_link(label) ⇒ Link
Attempts to find the Link identified by the label.
490 491 492 493 |
# File 'lib/oflow/task.rb', line 490 def find_link(label) label = label.to_sym unless label.nil? @links[label] || @links[nil] end |
#flush ⇒ Object
Waits for all processing to complete before returning.
318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/oflow/task.rb', line 318 def flush() return if @loop.nil? @waiting_thread = Thread.current begin @loop.wakeup() rescue # ignore end while busy? sleep(2.0) end @waiting_thread = nil end |
#full_name ⇒ String
Similar to a full file path. The full_name described the containment of the named item.
138 139 140 141 142 143 144 |
# File 'lib/oflow/task.rb', line 138 def full_name() if @flow.nil? ':' + @name.to_s else @flow.full_name() + ':' + @name.to_s end end |
#has_input(op) ⇒ Object
435 436 437 438 439 440 441 |
# File 'lib/oflow/task.rb', line 435 def has_input(op) ins = @actor.inputs() return true if ins.nil? op = op.to_sym unless op.nil? ins.each { |spec| return true if spec.op.nil? || spec.op == op } false end |
#has_links? ⇒ Boolean
501 502 503 |
# File 'lib/oflow/task.rb', line 501 def has_links?() !@links.nil? && !@links.empty? end |
#inputs ⇒ Object
The expected inputs the Task supports or nil if not implemented.
338 339 340 |
# File 'lib/oflow/task.rb', line 338 def inputs() @actor.inputs() end |
#link(label, target, op, flow_name = nil) ⇒ Object
127 128 129 130 131 132 133 |
# File 'lib/oflow/task.rb', line 127 def link(label, target, op, flow_name=nil) label = label.to_sym unless label.nil? op = op.to_sym unless op.nil? flow_name = flow_name.to_sym unless flow_name.nil? raise ConfigError.new("Link #{label} already exists.") unless @links[label].nil? @links[label] = Link.new(flow_name, target.to_sym, op) end |
#links ⇒ Hash
Returns the Links.
497 498 499 |
# File 'lib/oflow/task.rb', line 497 def links() @links end |
#log ⇒ Task
Returns a log Task by looking for that Task as an attribute and then in the parent Flow.
149 150 151 152 |
# File 'lib/oflow/task.rb', line 149 def log() return @log unless @log.nil? @flow.log() end |
#max_queue_count ⇒ NilClass|Fixnum
Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit.
257 258 259 |
# File 'lib/oflow/task.rb', line 257 def max_queue_count() @max_queue_count end |
#outputs ⇒ Object
The expected outputs the Task supports or nil if not implemented.
343 344 345 |
# File 'lib/oflow/task.rb', line 343 def outputs() @actor.outputs() end |
#proc_count ⇒ Fixnum
Returns the total number of requested processed.
263 264 265 |
# File 'lib/oflow/task.rb', line 263 def proc_count() @proc_cnt end |
#queue_count ⇒ Fixnum
Returns the number of requests on the queue.
223 224 225 |
# File 'lib/oflow/task.rb', line 223 def queue_count() @queue.length end |
#receive(op, box) ⇒ Object
Creates a Request and adds it to the queue.
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/oflow/task.rb', line 350 def receive(op, box) if debug? debug("receive(#{op}, #{box.nil? ? '<nil>' : box}) #{state_string}") else info("receive(#{op})") end return if CLOSING == @state raise BlockedError.new() if BLOCKED == @state box = box.receive(full_name, op) unless box.nil? # Special case for starting state so that an Actor can place an item on # the queue before the loop is started. if @loop.nil? && STARTING != @state # no thread task begin @actor.perform(op, box) rescue Exception => e handle_error(e) end return end unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count @req_thread = Thread.current sleep(timeout) unless @req_timeout.nil? || 0 == @req_timeout @req_thread = nil raise BusyError.new() unless @queue.size() < @max_queue_count end @req_mutex.synchronize { @queue.insert(0, Request.new(op, box)) } @loop.wakeup() if RUNNING == @state end |
#request_timeout ⇒ Float
Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.
250 251 252 |
# File 'lib/oflow/task.rb', line 250 def request_timeout() @req_timeout end |
#resolve_all_links ⇒ Object
Resolves all links. Called by the system.
459 460 461 462 463 |
# File 'lib/oflow/task.rb', line 459 def resolve_all_links() @links.each_value { |lnk| set_link_target(lnk) if lnk.target.nil? } end |
#resolve_link(label) ⇒ Link
Attempts to find and resolve the Link identified by the label. Resolving a Link uses the target identifier to find the target Task and save that in the Link.
470 471 472 473 474 475 476 |
# File 'lib/oflow/task.rb', line 470 def resolve_link(label) label = label.to_sym unless label.nil? lnk = @links[label] || @links[nil] return nil if lnk.nil? set_link_target(lnk) if lnk.target.nil? lnk end |
#set_link_target(lnk) ⇒ Object
Sets the target Task for a Link.
480 481 482 483 484 485 |
# File 'lib/oflow/task.rb', line 480 def set_link_target(lnk) f = @flow f = @flow.env.find_flow(lnk.flow_name) unless lnk.flow_name.nil? raise ConfigError.new("Flow '#{lnk.flow_name}' not found.") if f.nil? lnk.instance_variable_set(:@target, f.find_task(lnk.target_name)) end |
#set_option(key, value) ⇒ Object
410 411 412 413 414 415 416 417 418 419 |
# File 'lib/oflow/task.rb', line 410 def set_option(key, value) case key.to_sym() when :max_queue_count @max_queue_count = value.to_i when :request_timeout @req_timeout = value.to_f else @actor.set_option(key, value) end end |
#set_options(options) ⇒ Object
Processes the initialize() options. Subclasses should call super.
405 406 407 408 |
# File 'lib/oflow/task.rb', line 405 def () @max_queue_count = .fetch(:max_queue_count, @max_queue_count) @req_timeout = .fetch(:request_timeout, @req_timeout).to_f end |
#ship(dest, box) ⇒ Object
Sends a message to another Task or Flow.
386 387 388 389 390 391 392 393 394 395 396 397 |
# File 'lib/oflow/task.rb', line 386 def ship(dest, box) box.freeze() unless box.nil? link = resolve_link(dest) raise LinkError.new(dest) if link.nil? || link.target.nil? if debug? debug("shipping #{box} to #{link.target_name}:#{link.op}") else info("shipping to #{link.target_name}:#{link.op}") end link.target.receive(link.op, box) link end |
#shutdown(flush_first = false) ⇒ Object
Closes the Task by exiting the processing thread. If flush is true then all requests in the queue are processed first.
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/oflow/task.rb', line 301 def shutdown(flush_first=false) return if @loop.nil? if flush_first @state = BLOCKED flush() end @state = CLOSING begin # if the loop has already exited this will raise an Exception that can be ignored @loop.wakeup() rescue # ignore end @loop.join() end |
#start ⇒ Object
Restarts the Task’s processing thread.
294 295 296 297 |
# File 'lib/oflow/task.rb', line 294 def start() @state = RUNNING @loop.wakeup() unless @loop.nil? end |
#state_string ⇒ String
Returns the state of the Task as a String.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/oflow/task.rb', line 164 def state_string() ss = 'UNKNOWN' case @state when STARTING ss = 'STARTING' when STOPPED ss = 'STOPPED' when RUNNING ss = 'RUNNING' when CLOSING ss = 'CLOSING' when BLOCKED ss = 'BLOCKED' when STEP ss = 'STEP' end ss end |
#step(max_wait = 5) ⇒ Object
Causes the Task to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long.
276 277 278 279 280 281 282 283 284 285 |
# File 'lib/oflow/task.rb', line 276 def step(max_wait=5) return nil if @loop.nil? return nil if STOPPED != @state || @queue.empty? @state = STEP @step_thread = Thread.current @loop.wakeup() sleep(max_wait) @step_thread = nil self end |
#stop ⇒ Object
Causes the Actor to stop processing any more requests after the current request has finished.
269 270 271 |
# File 'lib/oflow/task.rb', line 269 def stop() @state = STOPPED end |
#wakeup ⇒ Object
Wakes up the Task if it has been stopped or if Env.shutdown() has been called.
288 289 290 291 |
# File 'lib/oflow/task.rb', line 288 def wakeup() # don't wake if the task is currently processing @loop.wakeup() unless @loop.nil? end |