Class: OFlow::Task
- Inherits:
-
Object
- Object
- OFlow::Task
- Includes:
- HasErrorHandler, HasLinks, HasLog, HasName
- Defined in:
- lib/oflow/task.rb
Overview
The Task class provides the asynchronous functionality for the system. Each Task has it’s own thread and receives requests as an operation and Box of data by the receive() method. The request is put on a queue and popped off one at a time and handed to an Actor associatesd with the Task.
Defined Under Namespace
Constant Summary collapse
- STARTING =
value of @state that indicates the Task is being created.
0
- STOPPED =
value of @state that indicates the Task is not currently processing requests
1
- RUNNING =
value of @state that indicates the Task is currently ready to process requests
2
- CLOSING =
value of @state that indicates the Task is shutting down
3
- BLOCKED =
value of @state that indicates the Task is not receiving new requests.
4
- STEP =
value of @state that indicates the Task is processing one request and will stop after that processing is complete
5
Instance Attribute Summary collapse
-
#actor ⇒ Object
readonly
the Actor.
-
#state ⇒ Object
The current processing state of the Task.
Attributes included from HasName
Instance Method Summary collapse
- #_check_link(lnk, errors) ⇒ Object
- #_validation_errors ⇒ Object
-
#backed_up ⇒ Fixnum
Returns a score indicating how backed up the queue is.
-
#busy? ⇒ true|false
Returns the true if any requests are queued or a request is being processed.
-
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String that describes the Task.
-
#flush ⇒ Object
Waits for all processing to complete before returning.
- #has_input(op) ⇒ Object
-
#initialize(flow, name, actor_class, options = {}) ⇒ Task
constructor
A Task is initialized by specifying a class to create an instance of.
-
#inputs ⇒ Object
The expected inputs the Task supports or nil if not implemented.
-
#max_queue_count ⇒ NilClass|Fixnum
Returns the maximum number of requests allowed on the normal processing queue.
-
#outputs ⇒ Object
The expected outputs the Task supports or nil if not implemented.
-
#proc_count ⇒ Fixnum
Returns the total number of requested processed.
-
#queue_count ⇒ Fixnum
Returns the number of requests on the queue.
-
#receive(op, box) ⇒ Object
Creates a Request and adds it to the queue.
-
#request_timeout ⇒ Float
Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.
-
#resolve_all_links ⇒ Object
Resolves all links.
-
#set_options(options) ⇒ Object
Processes the initialize() options.
-
#ship(dest, box) ⇒ Object
Sends a message to another Task or Flow.
-
#shutdown(flush_first = false) ⇒ Object
Closes the Task by exiting the processing thread.
-
#start ⇒ Object
Restarts the Task’s processing thread.
-
#state_string ⇒ String
Returns the state of the Task as a String.
-
#step(max_wait = 5) ⇒ Object
Causes the Task to process one request and then stop.
-
#stop ⇒ Object
Causes the Actor to stop processing any more requests after the current request has finished.
-
#wakeup ⇒ Object
Wakes up the Task if it has been stopped or if Env.shutdown() has been called.
Methods included from HasLog
#debug, #error, #fatal, #info, #log, #log=, #log_msg, #warn
Methods included from HasErrorHandler
#error_handler, #error_handler=, #handle_error
Methods included from HasLinks
#find_link, #has_links?, #init_links, #link, #links, #resolve_link, #set_link_target
Methods included from HasName
Constructor Details
#initialize(flow, name, actor_class, options = {}) ⇒ Task
A Task is initialized by specifying a class to create an instance of.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/oflow/task.rb', line 38 def initialize(flow, name, actor_class, ={}) @state = STARTING @queue = [] @req_mutex = Mutex.new() @req_thread = nil @step_thread = nil @waiting_thread = nil @req_timeout = 0.0 @max_queue_count = nil @current_req = nil @proc_cnt = 0 @loop = nil init_name(flow, name) init_links() () info("Creating actor #{actor_class} with options #{}.") @actor = actor_class.new(self, ) raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform) @state = .fetch(:state, RUNNING) return unless @actor.with_own_thread() @loop = Thread.start(self) do |me| Thread.current[:name] = me.full_name() while CLOSING != @state begin if RUNNING == @state || STEP == @state || BLOCKED == @state req = nil if @queue.empty? @waiting_thread.wakeup() unless @waiting_thread.nil? sleep(1.0) next end @req_mutex.synchronize { req = @queue.pop() } @req_thread.wakeup() unless @req_thread.nil? @current_req = req begin info("perform(#{req.op}, #{req.box})") @actor.perform(req.op, req.box) unless req.nil? rescue Exception => e handle_error(e) end @proc_cnt += 1 @current_req = nil if STEP == @state @step_thread.wakeup() unless @step_thread.nil? @state = STOPPED end elsif STOPPED == @state sleep(1.0) end rescue Exception => e puts "*** #{full_name} #{e.class}: #{e.}" @current_req = nil # TBD Env.rescue(e) end end end end |
Instance Attribute Details
#actor ⇒ Object (readonly)
the Actor
31 32 33 |
# File 'lib/oflow/task.rb', line 31 def actor @actor end |
#state ⇒ Object
The current processing state of the Task
29 30 31 |
# File 'lib/oflow/task.rb', line 29 def state @state end |
Instance Method Details
#_check_link(lnk, errors) ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/oflow/task.rb', line 358 def _check_link(lnk, errors) if lnk.target.nil? errors << ValidateError::Problem.new(full_name, ValidateError::Problem::LINK_ERROR, "Failed to find task '#{lnk.target_name}'.") return end unless lnk.target.has_input(lnk.op) errors << ValidateError::Problem.new(full_name, ValidateError::Problem::INPUT_ERROR, "'#{lnk.op}' not allowed on '#{lnk.target.full_name}'.") return end # TBD # Verify target has link.op as input if input is specified. end |
#_validation_errors ⇒ Object
336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/oflow/task.rb', line 336 def _validation_errors() errors = [] @links.each_value { |lnk| _check_link(lnk, errors) } unless (outs = @actor.outputs()).nil? outs.each do |spec| if find_link(spec.dest).nil? errors << ValidateError::Problem.new(full_name, ValidateError::Problem::MISSING_ERROR, "Missing link for '#{spec.dest}'.") end end end errors end |
#backed_up ⇒ Fixnum
Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector.
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/oflow/task.rb', line 163 def backed_up() cnt = @queue.size() + (@current_req.nil? ? 0 : 1) return 0 if 0 == cnt if @max_queue_count.nil? || 0 == @max_queue_count cnt = 80 if 80 < cnt cnt else cnt * 100 / @max_queue_count end end |
#busy? ⇒ true|false
Returns the true if any requests are queued or a request is being processed.
176 177 178 |
# File 'lib/oflow/task.rb', line 176 def busy?() !@current_req.nil? || !@queue.empty? end |
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String that describes the Task.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/oflow/task.rb', line 123 def describe(detail=0, indent=0) i = ' ' * indent lines = ["#{i}#{name} (#{actor.class}) {"] @links.each { |local,link| lines << " #{i}#{local} => #{link.target_name}:#{link.op}" } if 1 <= detail lines << " #{i}queued: #{queue_count()} (#{busy? ? 'busy' : 'idle'})" lines << " #{i}state: #{state_string()}" @actor..each do |key,value| lines << " #{i}#{key} = #{value} (#{value.class})" end if 2 <= detail lines << " #{i}processing: #{@current_req.describe(detail)}" unless @current_req.nil? # Copy queue so it doesn't change while working with it and don't # block the queue. It is okay for the requests to be stale as it is # for a display that will be out of date as soon as it is displayed. reqs = [] @req_mutex.synchronize { reqs = Array.new(@queue) } lines << " #{i}queue:" reqs.each do |req| lines << " #{i}#{req.describe(detail)}" end end end lines << i + "}" lines.join("\n") end |
#flush ⇒ Object
Waits for all processing to complete before returning.
251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/oflow/task.rb', line 251 def flush() return if @loop.nil? @waiting_thread = Thread.current begin @loop.wakeup() rescue # ignore end while busy? sleep(2.0) end @waiting_thread = nil end |
#has_input(op) ⇒ Object
350 351 352 353 354 355 356 |
# File 'lib/oflow/task.rb', line 350 def has_input(op) ins = @actor.inputs() return true if ins.nil? op = op.to_sym unless op.nil? ins.each { |spec| return true if spec.op.nil? || spec.op == op } false end |
#inputs ⇒ Object
The expected inputs the Task supports or nil if not implemented.
271 272 273 |
# File 'lib/oflow/task.rb', line 271 def inputs() @actor.inputs() end |
#max_queue_count ⇒ NilClass|Fixnum
Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit.
190 191 192 |
# File 'lib/oflow/task.rb', line 190 def max_queue_count() @max_queue_count end |
#outputs ⇒ Object
The expected outputs the Task supports or nil if not implemented.
276 277 278 |
# File 'lib/oflow/task.rb', line 276 def outputs() @actor.outputs() end |
#proc_count ⇒ Fixnum
Returns the total number of requested processed.
196 197 198 |
# File 'lib/oflow/task.rb', line 196 def proc_count() @proc_cnt end |
#queue_count ⇒ Fixnum
Returns the number of requests on the queue.
156 157 158 |
# File 'lib/oflow/task.rb', line 156 def queue_count() @queue.length end |
#receive(op, box) ⇒ Object
Creates a Request and adds it to the queue.
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/oflow/task.rb', line 283 def receive(op, box) info("receive(#{op}, #{box}) #{state_string}") return if CLOSING == @state raise BlockedError.new() if BLOCKED == @state box = box.receive(full_name, op) unless box.nil? # Special case for starting state so that an Actor can place an item on # the queue before the loop is started. if @loop.nil? && STARTING != @state # no thread task begin @actor.perform(op, box) rescue Exception => e handle_error(e) end return end unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count @req_thread = Thread.current sleep(timeout) unless @req_timeout.nil? || 0 == @req_timeout @req_thread = nil raise BusyError.new() unless @queue.size() < @max_queue_count end @req_mutex.synchronize { @queue.insert(0, Request.new(op, box)) } @loop.wakeup() if RUNNING == @state end |
#request_timeout ⇒ Float
Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.
183 184 185 |
# File 'lib/oflow/task.rb', line 183 def request_timeout() @req_timeout end |
#resolve_all_links ⇒ Object
Resolves all links. Called by the system.
374 375 376 377 378 |
# File 'lib/oflow/task.rb', line 374 def resolve_all_links() @links.each_value { |lnk| set_link_target(lnk) if lnk.target.nil? } end |
#set_options(options) ⇒ Object
Processes the initialize() options. Subclasses should call super.
331 332 333 334 |
# File 'lib/oflow/task.rb', line 331 def () @max_queue_count = .fetch(:max_queue_count, @max_queue_count) @req_timeout = .fetch(:request_timeout, @req_timeout).to_f end |
#ship(dest, box) ⇒ Object
Sends a message to another Task or Flow.
316 317 318 319 320 321 322 323 |
# File 'lib/oflow/task.rb', line 316 def ship(dest, box) box.freeze() unless box.nil? link = resolve_link(dest) raise LinkError.new(dest) if link.nil? || link.target.nil? info("shipping #{box} to #{link.target_name}:#{link.op}") link.target.receive(link.op, box) link end |
#shutdown(flush_first = false) ⇒ Object
Closes the Task by exiting the processing thread. If flush is true then all requests in the queue are processed first.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/oflow/task.rb', line 234 def shutdown(flush_first=false) return if @loop.nil? if flush_first @state = BLOCKED flush() end @state = CLOSING begin # if the loop has already exited this will raise an Exception that can be ignored @loop.wakeup() rescue # ignore end @loop.join() end |
#start ⇒ Object
Restarts the Task’s processing thread.
227 228 229 230 |
# File 'lib/oflow/task.rb', line 227 def start() @state = RUNNING @loop.wakeup() unless @loop.nil? end |
#state_string ⇒ String
Returns the state of the Task as a String.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/oflow/task.rb', line 105 def state_string() ss = 'UNKNOWN' case @state when STOPPED ss = 'STOPPED' when RUNNING ss = 'RUNNING' when CLOSING ss = 'CLOSING' when STEP ss = 'STEP' end ss end |
#step(max_wait = 5) ⇒ Object
Causes the Task to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long.
209 210 211 212 213 214 215 216 217 218 |
# File 'lib/oflow/task.rb', line 209 def step(max_wait=5) return nil if @loop.nil? return nil if STOPPED != @state || @queue.empty? @state = STEP @step_thread = Thread.current @loop.wakeup() sleep(max_wait) @step_thread = nil self end |
#stop ⇒ Object
Causes the Actor to stop processing any more requests after the current request has finished.
202 203 204 |
# File 'lib/oflow/task.rb', line 202 def stop() @state = STOPPED end |
#wakeup ⇒ Object
Wakes up the Task if it has been stopped or if Env.shutdown() has been called.
221 222 223 224 |
# File 'lib/oflow/task.rb', line 221 def wakeup() # don't wake if the task is currently processing @loop.wakeup() unless @loop.nil? end |