Class: OFlow::Task

Inherits:
Object
  • Object
show all
Includes:
HasErrorHandler, HasLog
Defined in:
lib/oflow/task.rb

Overview

The Task class provides the asynchronous functionality for the system. Each Task has it’s own thread and receives requests as an operation and Box of data by the receive() method. The request is put on a queue and popped off one at a time and handed to an Actor associatesd with the Task.

Defined Under Namespace

Classes: Request

Constant Summary collapse

STARTING =

value of @state that indicates the Task is being created.

0
STOPPED =

value of @state that indicates the Task is not currently processing requests

1
RUNNING =

value of @state that indicates the Task is currently ready to process requests

2
CLOSING =

value of @state that indicates the Task is shutting down

3
BLOCKED =

value of @state that indicates the Task is not receiving new requests.

4
STEP =

value of @state that indicates the Task is processing one request and will stop after that processing is complete

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from HasLog

#debug, #debug?, #error, #error?, #fatal, #info, #info?, #log=, #log_msg, #warn, #warn?

Methods included from HasErrorHandler

#error_handler=, #handle_error

Constructor Details

#initialize(flow, name, actor_class, options = {}) ⇒ Task

A Task is initialized by specifying a class to create an instance of.

Parameters:

  • flow (Flow)

    Flow containing the Task

  • name (name)

    Task base name

  • actor_class (Class)

    _class Class of the Actor to create

  • options (Hash) (defaults to: {})

    additional options for the Task or Actor

Raises:

  • (Exception)


44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/oflow/task.rb', line 44

def initialize(flow, name, actor_class, options={})
  @flow = flow
  @name = name.to_sym
  @state = STARTING
  @bounds = nil # [x, y, w, h] as fixnums
  @color = nil  # [r, g, b] as floats
  @shape = nil  # Rectangle is the default
  @queue = []
  @req_mutex = Mutex.new()
  @req_thread = nil
  @step_thread = nil
  @waiting_thread = nil
  @req_timeout = 0.0
  @max_queue_count = nil
  @current_req = nil
  @proc_cnt = 0
  @loop = nil
  @links = {}
  @log = nil
  @error_handler = nil

  set_options(options)

  info("Creating actor #{actor_class} with options #{options}.")
  @actor = actor_class.new(self, options)
  raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform)

  @state = options.fetch(:state, RUNNING)

  return unless @actor.with_own_thread()

  @loop = Thread.start(self) do |me|
    Thread.current[:name] = me.full_name()
    while CLOSING != @state
      begin
        if RUNNING == @state || STEP == @state || BLOCKED == @state
          req = nil
          if @queue.empty?
            @waiting_thread.wakeup() unless @waiting_thread.nil?
            sleep(1.0)
            next
          end
          @req_mutex.synchronize {
            req = @queue.pop()
          }
          @req_thread.wakeup() unless @req_thread.nil?

          @current_req = req
          begin
            if debug?
              debug("perform(#{req.op}, #{req.box.nil? ? '<nil>' : req.box})")
            else
              info("perform(#{req.op})")
            end
            @actor.perform(req.op, req.box) unless req.nil?
          rescue Exception => e
            handle_error(e)
          end
          @proc_cnt += 1
          @current_req = nil
          if STEP == @state
            @step_thread.wakeup() unless @step_thread.nil?
            @state = STOPPED
          end
        elsif STOPPED == @state
          sleep(1.0)
        end
      rescue Exception => e
        puts "*** #{full_name} #{e.class}: #{e.message}"
        @current_req = nil
        # TBD Env.rescue(e)
      end
    end
  end
end

Instance Attribute Details

#actorObject (readonly)

the Actor



37
38
39
# File 'lib/oflow/task.rb', line 37

def actor
  @actor
end

#boundsObject

Returns the value of attribute bounds.



16
17
18
# File 'lib/oflow/task.rb', line 16

def bounds
  @bounds
end

#colorObject

Returns the value of attribute color.



18
19
20
# File 'lib/oflow/task.rb', line 18

def color
  @color
end

#nameObject (readonly)

The name.



14
15
16
# File 'lib/oflow/task.rb', line 14

def name
  @name
end

#shapeObject

Returns the value of attribute shape.



17
18
19
# File 'lib/oflow/task.rb', line 17

def shape
  @shape
end

#stateObject

The current processing state of the Task



35
36
37
# File 'lib/oflow/task.rb', line 35

def state
  @state
end

Instance Method Details



443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/oflow/task.rb', line 443

def _check_link(lnk, errors)
  if lnk.target.nil?
    errors << ValidateError::Problem.new(full_name, ValidateError::Problem::LINK_ERROR, "Failed to find task '#{lnk.target_name}'.")
    return
  end
  unless lnk.target.has_input(lnk.op)
    errors << ValidateError::Problem.new(full_name, ValidateError::Problem::INPUT_ERROR, "'#{lnk.op}' not allowed on '#{lnk.target.full_name}'.")
    return
  end

  # TBD
  # Verify target has link.op as input if input is specified.

end

#_validation_errorsObject



421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/oflow/task.rb', line 421

def _validation_errors()
  errors = []
  @links.each_value { |lnk| _check_link(lnk, errors) }

  unless (outs = @actor.outputs()).nil?
    outs.each do |spec|
      if find_link(spec.dest).nil?
        errors << ValidateError::Problem.new(full_name, ValidateError::Problem::MISSING_ERROR, "Missing link for '#{spec.dest}'.")
      end
    end
  end
  errors
end

#backed_upFixnum

Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector.

Returns:

  • (Fixnum)

    a measure of how backed up a Task is



230
231
232
233
234
235
236
237
238
239
# File 'lib/oflow/task.rb', line 230

def backed_up()
  cnt = @queue.size() + (@current_req.nil? ? 0 : 1)
  return 0 if 0 == cnt
  if @max_queue_count.nil? || 0 == @max_queue_count
    cnt = 80 if 80 < cnt
    cnt
  else
    cnt * 100 / @max_queue_count
  end
end

#busy?true|false

Returns the true if any requests are queued or a request is being processed.

Returns:

  • (true|false)

    true if busy, false otherwise



243
244
245
# File 'lib/oflow/task.rb', line 243

def busy?()
  !@current_req.nil? || !@queue.empty? || @actor.busy?
end

#describe(detail = 0, indent = 0) ⇒ Object

Returns a String that describes the Task.

Parameters:

  • detail (Fixnum) (defaults to: 0)

    higher values result in more detail in the description

  • indent (Fixnum) (defaults to: 0)

    the number of spaces to indent the description



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/oflow/task.rb', line 186

def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{name} (#{actor.class}) {"]
  @links.each { |local,link|
    if link.flow_name.nil?
      lines << "  #{i}#{local} => :#{link.target_name}:#{link.op}"
    else
      lines << "  #{i}#{local} => #{link.flow_name}:#{link.target_name}:#{link.op}"
    end
  }
  if 1 <= detail
    lines << "  #{i}queued: #{queue_count()} (#{busy? ? 'busy' : 'idle'})"
    lines << "  #{i}state: #{state_string()}"
    @actor.options.each do |key,value|
      lines << "  #{i}#{key} = #{value} (#{value.class})"
    end
    if 2 <= detail
      lines << "  #{i}processing: #{@current_req.describe(detail)}" unless @current_req.nil?
      # Copy queue so it doesn't change while working with it and don't
      # block the queue. It is okay for the requests to be stale as it is
      # for a display that will be out of date as soon as it is displayed.
      reqs = []
      @req_mutex.synchronize {
        reqs = Array.new(@queue)
      }
      lines << "  #{i}queue:"
      reqs.each do |req|
        lines << "    #{i}#{req.describe(detail)}"
      end
    end
  end
  lines << i + "}"
  lines.join("\n")
end

#error_handlerTask

Returns an error handler Task by looking for that Task as an attribute and then in the parent Flow.

Returns:

  • (Task)

    error handler Task.



157
158
159
160
# File 'lib/oflow/task.rb', line 157

def error_handler()
  return @error_handler unless @error_handler.nil?
  @flow.error_handler()
end

Attempts to find the Link identified by the label.

Parameters:

  • label (Symbol|String)

    identifer of the Link

Returns:

  • (Link)

    returns the Link for the label



490
491
492
493
# File 'lib/oflow/task.rb', line 490

def find_link(label)
  label = label.to_sym unless label.nil?
  @links[label] || @links[nil]
end

#flushObject

Waits for all processing to complete before returning.



318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/oflow/task.rb', line 318

def flush()
  return if @loop.nil?
  @waiting_thread = Thread.current
  begin
    @loop.wakeup()
  rescue
    # ignore
  end
  while busy?
    sleep(2.0)
  end
  @waiting_thread = nil
end

#full_nameString

Similar to a full file path. The full_name described the containment of the named item.

Returns:

  • (String)

    full name of item



138
139
140
141
142
143
144
# File 'lib/oflow/task.rb', line 138

def full_name()
  if @flow.nil?
    ':' + @name.to_s
  else
    @flow.full_name() + ':' + @name.to_s
  end
end

#has_input(op) ⇒ Object



435
436
437
438
439
440
441
# File 'lib/oflow/task.rb', line 435

def has_input(op)
  ins = @actor.inputs()
  return true if ins.nil?
  op = op.to_sym unless op.nil?
  ins.each { |spec| return true if spec.op.nil? || spec.op == op }
  false
end

#has_links?Boolean

Returns:

  • (Boolean)


501
502
503
# File 'lib/oflow/task.rb', line 501

def has_links?()
  !@links.nil? && !@links.empty?
end

#inputsObject

The expected inputs the Task supports or nil if not implemented.



338
339
340
# File 'lib/oflow/task.rb', line 338

def inputs()
  @actor.inputs()
end

Parameters:

  • label (Symbol|String)

    identifer of the Link

  • target (Symbol|String)

    identifer of the target Task

  • op (Symbol|String)

    operation to perform on the target Task

  • flow_name (Symbol|String) (defaults to: nil)

    parent flow name to find the target task in or nil for this parent

Raises:



127
128
129
130
131
132
133
# File 'lib/oflow/task.rb', line 127

def link(label, target, op, flow_name=nil)
  label = label.to_sym unless label.nil?
  op = op.to_sym unless op.nil?
  flow_name = flow_name.to_sym unless flow_name.nil?
  raise ConfigError.new("Link #{label} already exists.") unless @links[label].nil?
  @links[label] = Link.new(flow_name, target.to_sym, op)
end

Returns the Links.

Returns:

  • (Hash)

    Hash of Links with the keys as Symbols that are the labels of the Links.



497
498
499
# File 'lib/oflow/task.rb', line 497

def links()
  @links
end

#logTask

Returns a log Task by looking for that Task as an attribute and then in the parent Flow.

Returns:

  • (Task)

    log Task.



149
150
151
152
# File 'lib/oflow/task.rb', line 149

def log()
  return @log unless @log.nil?
  @flow.log()
end

#max_queue_countNilClass|Fixnum

Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit.

Returns:

  • (NilClass|Fixnum)

    maximum number of request that can be queued



257
258
259
# File 'lib/oflow/task.rb', line 257

def max_queue_count()
  @max_queue_count
end

#outputsObject

The expected outputs the Task supports or nil if not implemented.



343
344
345
# File 'lib/oflow/task.rb', line 343

def outputs()
  @actor.outputs()
end

#proc_countFixnum

Returns the total number of requested processed.

Returns:

  • (Fixnum)

    number of request processed



263
264
265
# File 'lib/oflow/task.rb', line 263

def proc_count()
  @proc_cnt
end

#queue_countFixnum

Returns the number of requests on the queue.

Returns:

  • (Fixnum)

    number of queued requests



223
224
225
# File 'lib/oflow/task.rb', line 223

def queue_count()
  @queue.length
end

#receive(op, box) ⇒ Object

Creates a Request and adds it to the queue.

Parameters:

  • op (Symbol)

    operation to perform

  • box (Box)

    contents or data for the request

Raises:



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/oflow/task.rb', line 350

def receive(op, box)
  if debug?
    debug("receive(#{op}, #{box.nil? ? '<nil>' : box}) #{state_string}")
  else
    info("receive(#{op})")
  end
  return if CLOSING == @state

  raise BlockedError.new() if BLOCKED == @state

  box = box.receive(full_name, op) unless box.nil?
  # Special case for starting state so that an Actor can place an item on
  # the queue before the loop is started.
  if @loop.nil? && STARTING != @state # no thread task
    begin
      @actor.perform(op, box)
    rescue Exception => e
      handle_error(e)
    end
    return
  end
  unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count
    @req_thread = Thread.current
    sleep(timeout) unless @req_timeout.nil? || 0 == @req_timeout
    @req_thread = nil
    raise BusyError.new() unless @queue.size() < @max_queue_count
  end
  @req_mutex.synchronize {
    @queue.insert(0, Request.new(op, box))
  }
  @loop.wakeup() if RUNNING == @state
end

#request_timeoutFloat

Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.

Returns:

  • (Float)

    current timeout for the receive() method



250
251
252
# File 'lib/oflow/task.rb', line 250

def request_timeout()
  @req_timeout
end

Resolves all links. Called by the system.



459
460
461
462
463
# File 'lib/oflow/task.rb', line 459

def resolve_all_links()
  @links.each_value { |lnk|
    set_link_target(lnk) if lnk.target.nil?
  }
end

Attempts to find and resolve the Link identified by the label. Resolving a Link uses the target identifier to find the target Task and save that in the Link.

Parameters:

  • label (Symbol|String)

    identifer of the Link

Returns:

  • (Link)

    returns the Link for the label



470
471
472
473
474
475
476
# File 'lib/oflow/task.rb', line 470

def resolve_link(label)
  label = label.to_sym unless label.nil?
  lnk = @links[label] || @links[nil]
  return nil if lnk.nil?
  set_link_target(lnk) if lnk.target.nil?
  lnk
end

Sets the target Task for a Link.

Parameters:

  • lnk (Link)

    Link to find the target Task for.

Raises:



480
481
482
483
484
485
# File 'lib/oflow/task.rb', line 480

def set_link_target(lnk)
  f = @flow
  f = @flow.env.find_flow(lnk.flow_name) unless lnk.flow_name.nil?
  raise ConfigError.new("Flow '#{lnk.flow_name}' not found.") if f.nil?
  lnk.instance_variable_set(:@target, f.find_task(lnk.target_name))
end

#set_option(key, value) ⇒ Object



410
411
412
413
414
415
416
417
418
419
# File 'lib/oflow/task.rb', line 410

def set_option(key, value)
  case key.to_sym()
  when :max_queue_count
    @max_queue_count = value.to_i
  when :request_timeout
    @req_timeout = value.to_f
  else
    @actor.set_option(key, value)
  end
end

#set_options(options) ⇒ Object

Processes the initialize() options. Subclasses should call super.

Parameters:

  • options (Hash)

    options to be used for initialization

Options Hash (options):

  • :max_queue_count (Fixnum)

    maximum number of requests that can be queued before backpressure is applied to the caller.

  • :request_timeout (Float)

    timeout in seconds to wait before raising a BusyError if the request queue is too long.



405
406
407
408
# File 'lib/oflow/task.rb', line 405

def set_options(options)
  @max_queue_count = options.fetch(:max_queue_count, @max_queue_count)
  @req_timeout = options.fetch(:request_timeout, @req_timeout).to_f
end

#ship(dest, box) ⇒ Object

Sends a message to another Task or Flow.

Parameters:

  • dest (Symbol)

    identifies the link that points to the destination Task or Flow

  • box (Box)

    contents or data for the request

Raises:



386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/oflow/task.rb', line 386

def ship(dest, box)
  box.freeze() unless box.nil?
  link = resolve_link(dest)
  raise LinkError.new(dest) if link.nil? || link.target.nil?
  if debug?
    debug("shipping #{box} to #{link.target_name}:#{link.op}")
  else
    info("shipping to #{link.target_name}:#{link.op}")
  end
  link.target.receive(link.op, box)
  link
end

#shutdown(flush_first = false) ⇒ Object

Closes the Task by exiting the processing thread. If flush is true then all requests in the queue are processed first.



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/oflow/task.rb', line 301

def shutdown(flush_first=false)
  return if @loop.nil?
  if flush_first
    @state = BLOCKED
    flush()
  end
  @state = CLOSING
  begin
    # if the loop has already exited this will raise an Exception that can be ignored
    @loop.wakeup()
  rescue
    # ignore
  end
  @loop.join()
end

#startObject

Restarts the Task’s processing thread.



294
295
296
297
# File 'lib/oflow/task.rb', line 294

def start()
  @state = RUNNING
  @loop.wakeup() unless @loop.nil?
end

#state_stringString

Returns the state of the Task as a String.

Returns:

  • (String)

    String representation of the state



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/oflow/task.rb', line 164

def state_string()
  ss = 'UNKNOWN'
  case @state
  when STARTING
    ss = 'STARTING'
  when STOPPED
    ss = 'STOPPED'
  when RUNNING
    ss = 'RUNNING'
  when CLOSING
    ss = 'CLOSING'
  when BLOCKED
    ss = 'BLOCKED'
  when STEP
    ss = 'STEP'
  end
  ss
end

#step(max_wait = 5) ⇒ Object

Causes the Task to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long.

Parameters:

  • max_wait (Float|Fixnum) (defaults to: 5)

    _wait maximum time to wait for the step to complete



276
277
278
279
280
281
282
283
284
285
# File 'lib/oflow/task.rb', line 276

def step(max_wait=5)
  return nil if @loop.nil?
  return nil if STOPPED != @state || @queue.empty?
  @state = STEP
  @step_thread = Thread.current
  @loop.wakeup()
  sleep(max_wait)
  @step_thread = nil
  self
end

#stopObject

Causes the Actor to stop processing any more requests after the current request has finished.



269
270
271
# File 'lib/oflow/task.rb', line 269

def stop()
  @state = STOPPED
end

#wakeupObject

Wakes up the Task if it has been stopped or if Env.shutdown() has been called.



288
289
290
291
# File 'lib/oflow/task.rb', line 288

def wakeup()
  # don't wake if the task is currently processing
  @loop.wakeup() unless @loop.nil?
end