Class: OFlow::Task
- Inherits:
-
Object
- Object
- OFlow::Task
- Includes:
- HasErrorHandler, HasLinks, HasLog, HasName
- Defined in:
- lib/oflow/task.rb
Overview
The Task class provides the asynchronous functionality for the system. Each Task has it’s own thread and receives requests as an operation and Box of data by the receive() method. The request is put on a queue and popped off one at a time and handed to an Actor associatesd with the Task.
Defined Under Namespace
Constant Summary collapse
- STARTING =
value of @state that indicates the Task is being created.
0- STOPPED =
value of @state that indicates the Task is not currently processing requests
1- RUNNING =
value of @state that indicates the Task is currently ready to process requests
2- CLOSING =
value of @state that indicates the Task is shutting down
3- BLOCKED =
value of @state that indicates the Task is not receiving new requests.
4- STEP =
value of @state that indicates the Task is processing one request and will stop after that processing is complete
5
Instance Attribute Summary collapse
-
#actor ⇒ Object
readonly
the Actor.
-
#state ⇒ Object
The current processing state of the Task.
Attributes included from HasName
Instance Method Summary collapse
- #_check_link(lnk, errors) ⇒ Object
- #_validation_errors ⇒ Object
-
#backed_up ⇒ Fixnum
Returns a score indicating how backed up the queue is.
-
#busy? ⇒ true|false
Returns the true if any requests are queued or a request is being processed.
-
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String that describes the Task.
-
#flush ⇒ Object
Waits for all processing to complete before returning.
- #has_input(op) ⇒ Object
-
#initialize(flow, name, actor_class, options = {}) ⇒ Task
constructor
A Task is initialized by specifying a class to create an instance of.
-
#inputs ⇒ Object
The expected inputs the Task supports or nil if not implemented.
-
#max_queue_count ⇒ NilClass|Fixnum
Returns the maximum number of requests allowed on the normal processing queue.
-
#outputs ⇒ Object
The expected outputs the Task supports or nil if not implemented.
-
#proc_count ⇒ Fixnum
Returns the total number of requested processed.
-
#queue_count ⇒ Fixnum
Returns the number of requests on the queue.
-
#receive(op, box) ⇒ Object
Creates a Request and adds it to the queue.
-
#request_timeout ⇒ Float
Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.
-
#resolve_all_links ⇒ Object
Resolves all links.
-
#set_options(options) ⇒ Object
Processes the initialize() options.
-
#ship(dest, box) ⇒ Object
Sends a message to another Task or Flow.
-
#shutdown(flush_first = false) ⇒ Object
Closes the Task by exiting the processing thread.
-
#start ⇒ Object
Restarts the Task’s processing thread.
-
#state_string ⇒ String
Returns the state of the Task as a String.
-
#step(max_wait = 5) ⇒ Object
Causes the Task to process one request and then stop.
-
#stop ⇒ Object
Causes the Actor to stop processing any more requests after the current request has finished.
-
#wakeup ⇒ Object
Wakes up the Task if it has been stopped or if Env.shutdown() has been called.
Methods included from HasLog
#debug, #error, #fatal, #info, #log, #log=, #log_msg, #warn
Methods included from HasErrorHandler
#error_handler, #error_handler=, #handle_error
Methods included from HasLinks
#find_link, #init_links, #link, #links, #resolve_link, #set_link_target
Methods included from HasName
Constructor Details
#initialize(flow, name, actor_class, options = {}) ⇒ Task
A Task is initialized by specifying a class to create an instance of.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/oflow/task.rb', line 38 def initialize(flow, name, actor_class, ={}) @state = STARTING @queue = [] @req_mutex = Mutex.new() @req_thread = nil @step_thread = nil @waiting_thread = nil @req_timeout = 0.0 @max_queue_count = nil @current_req = nil @proc_cnt = 0 @loop = nil init_name(flow, name) init_links() () @actor = actor_class.new(self, ) raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform) @state = .fetch(:state, RUNNING) return unless @actor.with_own_thread() @loop = Thread.start(self) do |me| Thread.current[:name] = me.full_name() while CLOSING != @state begin if RUNNING == @state || STEP == @state || BLOCKED == @state req = nil if @queue.empty? @waiting_thread.wakeup() unless @waiting_thread.nil? sleep(1.0) next end @req_mutex.synchronize { req = @queue.pop() } @req_thread.wakeup() unless @req_thread.nil? @current_req = req begin @actor.perform(req.op, req.box) unless req.nil? rescue Exception => e handle_error(e) end @proc_cnt += 1 @current_req = nil if STEP == @state @step_thread.wakeup() unless @step_thread.nil? @state = STOPPED end elsif STOPPED == @state sleep(1.0) end rescue Exception => e puts "*** #{full_name} #{e.class}: #{e.}" @current_req = nil # TBD Env.rescue(e) end end end end |
Instance Attribute Details
#actor ⇒ Object (readonly)
the Actor
31 32 33 |
# File 'lib/oflow/task.rb', line 31 def actor @actor end |
#state ⇒ Object
The current processing state of the Task
29 30 31 |
# File 'lib/oflow/task.rb', line 29 def state @state end |
Instance Method Details
#_check_link(lnk, errors) ⇒ Object
353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/oflow/task.rb', line 353 def _check_link(lnk, errors) if lnk.target.nil? errors << ValidateError::Problem.new(full_name, ValidateError::Problem::LINK_ERROR, "Failed to find task '#{lnk.target_name}'.") return end unless lnk.target.has_input(lnk.op) errors << ValidateError::Problem.new(full_name, ValidateError::Problem::INPUT_ERROR, "'#{lnk.op}' not allowed on '#{lnk.target.full_name}'.") return end # TBD # Verify target has link.op as input if input is specified. end |
#_validation_errors ⇒ Object
331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/oflow/task.rb', line 331 def _validation_errors() errors = [] @links.each_value { |lnk| _check_link(lnk, errors) } unless (outs = @actor.outputs()).nil? outs.each do |spec| if find_link(spec.dest).nil? errors << ValidateError::Problem.new(full_name, ValidateError::Problem::MISSING_ERROR, "Missing link for '#{spec.dest}'.") end end end errors end |
#backed_up ⇒ Fixnum
Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector.
161 162 163 164 165 166 167 168 169 170 |
# File 'lib/oflow/task.rb', line 161 def backed_up() cnt = @queue.size() + (@current_req.nil? ? 0 : 1) return 0 if 0 == cnt if @max_queue_count.nil? || 0 == @max_queue_count cnt = 80 if 80 < cnt cnt else cnt * 100 / @max_queue_count end end |
#busy? ⇒ true|false
Returns the true if any requests are queued or a request is being processed.
174 175 176 |
# File 'lib/oflow/task.rb', line 174 def busy?() !@current_req.nil? || !@queue.empty? end |
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String that describes the Task.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/oflow/task.rb', line 121 def describe(detail=0, indent=0) i = ' ' * indent lines = ["#{i}#{name} (#{actor.class}) {"] @links.each { |local,link| lines << " #{i}#{local} => #{link.target_name}:#{link.op}" } if 1 <= detail lines << " #{i}queued: #{queue_count()} (#{busy? ? 'busy' : 'idle'})" lines << " #{i}state: #{state_string()}" @actor..each do |key,value| lines << " #{i}#{key} = #{value} (#{value.class})" end if 2 <= detail lines << " #{i}processing: #{@current_req.describe(detail)}" unless @current_req.nil? # Copy queue so it doesn't change while working with it and don't # block the queue. It is okay for the requests to be stale as it is # for a display that will be out of date as soon as it is displayed. reqs = [] @req_mutex.synchronize { reqs = Array.new(@queue) } lines << " #{i}queue:" reqs.each do |req| lines << " #{i}#{req.describe(detail)}" end end end lines << i + "}" lines.join("\n") end |
#flush ⇒ Object
Waits for all processing to complete before returning.
249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/oflow/task.rb', line 249 def flush() return if @loop.nil? @waiting_thread = Thread.current begin @loop.wakeup() rescue # ignore end while busy? sleep(2.0) end @waiting_thread = nil end |
#has_input(op) ⇒ Object
345 346 347 348 349 350 351 |
# File 'lib/oflow/task.rb', line 345 def has_input(op) ins = @actor.inputs() return true if ins.nil? op = op.to_sym unless op.nil? ins.each { |spec| return true if spec.op.nil? || spec.op == op } false end |
#inputs ⇒ Object
The expected inputs the Task supports or nil if not implemented.
269 270 271 |
# File 'lib/oflow/task.rb', line 269 def inputs() @actor.inputs() end |
#max_queue_count ⇒ NilClass|Fixnum
Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit.
188 189 190 |
# File 'lib/oflow/task.rb', line 188 def max_queue_count() @max_queue_count end |
#outputs ⇒ Object
The expected outputs the Task supports or nil if not implemented.
274 275 276 |
# File 'lib/oflow/task.rb', line 274 def outputs() @actor.outputs() end |
#proc_count ⇒ Fixnum
Returns the total number of requested processed.
194 195 196 |
# File 'lib/oflow/task.rb', line 194 def proc_count() @proc_cnt end |
#queue_count ⇒ Fixnum
Returns the number of requests on the queue.
154 155 156 |
# File 'lib/oflow/task.rb', line 154 def queue_count() @queue.length end |
#receive(op, box) ⇒ Object
Creates a Request and adds it to the queue.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/oflow/task.rb', line 281 def receive(op, box) return if CLOSING == @state raise BlockedError.new() if BLOCKED == @state box = box.receive(full_name, op) unless box.nil? # Special case for starting state so that an Actor can place an item on # the queue before the loop is started. if @loop.nil? && STARTING != @state # no thread task begin @actor.perform(op, box) rescue Exception => e handle_error(e) end return end unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count @req_thread = Thread.current sleep(timeout) unless @req_timeout.nil? || 0 == @req_timeout @req_thread = nil raise BusyError.new() unless @queue.size() < @max_queue_count end @req_mutex.synchronize { @queue.insert(0, Request.new(op, box)) } @loop.wakeup() if RUNNING == @state end |
#request_timeout ⇒ Float
Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.
181 182 183 |
# File 'lib/oflow/task.rb', line 181 def request_timeout() @req_timeout end |
#resolve_all_links ⇒ Object
Resolves all links. Called by the system.
369 370 371 372 373 |
# File 'lib/oflow/task.rb', line 369 def resolve_all_links() @links.each_value { |lnk| set_link_target(lnk) if lnk.target.nil? } end |
#set_options(options) ⇒ Object
Processes the initialize() options. Subclasses should call super.
326 327 328 329 |
# File 'lib/oflow/task.rb', line 326 def () @max_queue_count = .fetch(:max_queue_count, @max_queue_count) @req_timeout = .fetch(:request_timeout, @req_timeout).to_f end |
#ship(dest, box) ⇒ Object
Sends a message to another Task or Flow.
312 313 314 315 316 317 318 |
# File 'lib/oflow/task.rb', line 312 def ship(dest, box) box.freeze() unless box.nil? link = resolve_link(dest) raise LinkError.new(dest) if link.nil? || link.target.nil? link.target.receive(link.op, box) link end |
#shutdown(flush_first = false) ⇒ Object
Closes the Task by exiting the processing thread. If flush is true then all requests in the queue are processed first.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/oflow/task.rb', line 232 def shutdown(flush_first=false) return if @loop.nil? if flush_first @state = BLOCKED flush() end @state = CLOSING begin # if the loop has already exited this will raise an Exception that can be ignored @loop.wakeup() rescue # ignore end @loop.join() end |
#start ⇒ Object
Restarts the Task’s processing thread.
225 226 227 228 |
# File 'lib/oflow/task.rb', line 225 def start() @state = RUNNING @loop.wakeup() unless @loop.nil? end |
#state_string ⇒ String
Returns the state of the Task as a String.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/oflow/task.rb', line 103 def state_string() ss = 'UNKNOWN' case @state when STOPPED ss = 'STOPPED' when RUNNING ss = 'RUNNING' when CLOSING ss = 'CLOSING' when STEP ss = 'STEP' end ss end |
#step(max_wait = 5) ⇒ Object
Causes the Task to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long.
207 208 209 210 211 212 213 214 215 216 |
# File 'lib/oflow/task.rb', line 207 def step(max_wait=5) return nil if @loop.nil? return nil if STOPPED != @state || @queue.empty? @state = STEP @step_thread = Thread.current @loop.wakeup() sleep(max_wait) @step_thread = nil self end |
#stop ⇒ Object
Causes the Actor to stop processing any more requests after the current request has finished.
200 201 202 |
# File 'lib/oflow/task.rb', line 200 def stop() @state = STOPPED end |
#wakeup ⇒ Object
Wakes up the Task if it has been stopped or if Env.shutdown() has been called.
219 220 221 222 |
# File 'lib/oflow/task.rb', line 219 def wakeup() # don't wake if the task is currently processing @loop.wakeup() unless @loop.nil? end |