Class: OFlow::Task

Inherits:
Object
  • Object
show all
Includes:
HasErrorHandler, HasLinks, HasLog, HasName
Defined in:
lib/oflow/task.rb

Overview

The Task class provides the asynchronous functionality for the system. Each Task has it’s own thread and receives requests as an operation and Box of data by the receive() method. The request is put on a queue and popped off one at a time and handed to an Actor associatesd with the Task.

Defined Under Namespace

Classes: Link, Request

Constant Summary collapse

STARTING =

value of @state that indicates the Task is being created.

0
STOPPED =

value of @state that indicates the Task is not currently processing requests

1
RUNNING =

value of @state that indicates the Task is currently ready to process requests

2
CLOSING =

value of @state that indicates the Task is shutting down

3
BLOCKED =

value of @state that indicates the Task is not receiving new requests.

4
STEP =

value of @state that indicates the Task is processing one request and will stop after that processing is complete

5

Instance Attribute Summary collapse

Attributes included from HasName

#name

Instance Method Summary collapse

Methods included from HasLog

#debug, #error, #fatal, #info, #log, #log=, #log_msg, #warn

Methods included from HasErrorHandler

#error_handler, #error_handler=, #handle_error

Methods included from HasLinks

#find_link, #init_links, #link, #links, #resolve_link, #set_link_target

Methods included from HasName

#full_name, #init_name

Constructor Details

#initialize(flow, name, actor_class, options = {}) ⇒ Task

A Task is initialized by specifying a class to create an instance of.

Parameters:

  • flow (Flow)

    Flow containing the Task

  • name (name)

    Task base name

  • actor_class (Class)

    _class Class of the Actor to create

  • options (Hash) (defaults to: {})

    additional options for the Task or Actor

Raises:

  • (Exception)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/oflow/task.rb', line 38

def initialize(flow, name, actor_class, options={})
  @state = STARTING
  @queue = []
  @req_mutex = Mutex.new()
  @req_thread = nil
  @step_thread = nil
  @waiting_thread = nil
  @req_timeout = 0.0
  @max_queue_count = nil
  @current_req = nil
  @proc_cnt = 0
  @loop = nil

  init_name(flow, name)
  init_links()
  set_options(options)

  @actor = actor_class.new(self, options)
  raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform)

  @state = options.fetch(:state, RUNNING)
  return unless @actor.with_own_thread()

  @loop = Thread.start(self) do |me|
    Thread.current[:name] = me.full_name()
    while CLOSING != @state
      begin
        if RUNNING == @state || STEP == @state || BLOCKED == @state
          req = nil
          if @queue.empty?
            @waiting_thread.wakeup() unless @waiting_thread.nil?
            sleep(1.0)
            next
          end
          @req_mutex.synchronize {
            req = @queue.pop()
          }
          @req_thread.wakeup() unless @req_thread.nil?

          @current_req = req
          begin
            @actor.perform(req.op, req.box) unless req.nil?
          rescue Exception => e
            handle_error(e)
          end
          @proc_cnt += 1
          @current_req = nil
          if STEP == @state
            @step_thread.wakeup() unless @step_thread.nil?
            @state = STOPPED
          end
        elsif STOPPED == @state
          sleep(1.0)
        end
      rescue Exception => e
        puts "*** #{full_name} #{e.class}: #{e.message}"
        @current_req = nil
        # TBD Env.rescue(e)
      end
    end
  end
end

Instance Attribute Details

#actorObject (readonly)

the Actor



31
32
33
# File 'lib/oflow/task.rb', line 31

def actor
  @actor
end

#stateObject

The current processing state of the Task



29
30
31
# File 'lib/oflow/task.rb', line 29

def state
  @state
end

Instance Method Details



353
354
355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/oflow/task.rb', line 353

def _check_link(lnk, errors)
  if lnk.target.nil?
    errors << ValidateError::Problem.new(full_name, ValidateError::Problem::LINK_ERROR, "Failed to find task '#{lnk.target_name}'.")
    return
  end
  unless lnk.target.has_input(lnk.op)
    errors << ValidateError::Problem.new(full_name, ValidateError::Problem::INPUT_ERROR, "'#{lnk.op}' not allowed on '#{lnk.target.full_name}'.")
    return
  end

  # TBD
  # Verify target has link.op as input if input is specified.

end

#_validation_errorsObject



331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/oflow/task.rb', line 331

def _validation_errors()
  errors = []
  @links.each_value { |lnk| _check_link(lnk, errors) }

  unless (outs = @actor.outputs()).nil?
    outs.each do |spec|
      if find_link(spec.dest).nil?
        errors << ValidateError::Problem.new(full_name, ValidateError::Problem::MISSING_ERROR, "Missing link for '#{spec.dest}'.")
      end
    end
  end
  errors
end

#backed_upFixnum

Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector.

Returns:

  • (Fixnum)

    a measure of how backed up a Task is



161
162
163
164
165
166
167
168
169
170
# File 'lib/oflow/task.rb', line 161

def backed_up()
  cnt = @queue.size() + (@current_req.nil? ? 0 : 1)
  return 0 if 0 == cnt
  if @max_queue_count.nil? || 0 == @max_queue_count
    cnt = 80 if 80 < cnt
    cnt
  else
    cnt * 100 / @max_queue_count
  end
end

#busy?true|false

Returns the true if any requests are queued or a request is being processed.

Returns:

  • (true|false)

    true if busy, false otherwise



174
175
176
# File 'lib/oflow/task.rb', line 174

def busy?()
  !@current_req.nil? || !@queue.empty?
end

#describe(detail = 0, indent = 0) ⇒ Object

Returns a String that describes the Task.

Parameters:

  • detail (Fixnum) (defaults to: 0)

    higher values result in more detail in the description

  • indent (Fixnum) (defaults to: 0)

    the number of spaces to indent the description



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/oflow/task.rb', line 121

def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{name} (#{actor.class}) {"]
  @links.each { |local,link|
    lines << "  #{i}#{local} => #{link.target_name}:#{link.op}"
  }
  if 1 <= detail
    lines << "  #{i}queued: #{queue_count()} (#{busy? ? 'busy' : 'idle'})"
    lines << "  #{i}state: #{state_string()}"
    @actor.options.each do |key,value|
      lines << "  #{i}#{key} = #{value} (#{value.class})"
    end
    if 2 <= detail
      lines << "  #{i}processing: #{@current_req.describe(detail)}" unless @current_req.nil?
      # Copy queue so it doesn't change while working with it and don't
      # block the queue. It is okay for the requests to be stale as it is
      # for a display that will be out of date as soon as it is displayed.
      reqs = []
      @req_mutex.synchronize {
        reqs = Array.new(@queue)
      }
      lines << "  #{i}queue:"
      reqs.each do |req|
        lines << "    #{i}#{req.describe(detail)}"
      end
    end
  end
  lines << i + "}"
  lines.join("\n")
end

#flushObject

Waits for all processing to complete before returning.



249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/oflow/task.rb', line 249

def flush()
  return if @loop.nil?
  @waiting_thread = Thread.current
  begin
    @loop.wakeup()
  rescue
    # ignore
  end
  while busy?
    sleep(2.0)
  end
  @waiting_thread = nil
end

#has_input(op) ⇒ Object



345
346
347
348
349
350
351
# File 'lib/oflow/task.rb', line 345

def has_input(op)
  ins = @actor.inputs()
  return true if ins.nil?
  op = op.to_sym unless op.nil?
  ins.each { |spec| return true if spec.op.nil? || spec.op == op }
  false
end

#inputsObject

The expected inputs the Task supports or nil if not implemented.



269
270
271
# File 'lib/oflow/task.rb', line 269

def inputs()
  @actor.inputs()
end

#max_queue_countNilClass|Fixnum

Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit.

Returns:

  • (NilClass|Fixnum)

    maximum number of request that can be queued



188
189
190
# File 'lib/oflow/task.rb', line 188

def max_queue_count()
  @max_queue_count
end

#outputsObject

The expected outputs the Task supports or nil if not implemented.



274
275
276
# File 'lib/oflow/task.rb', line 274

def outputs()
  @actor.outputs()
end

#proc_countFixnum

Returns the total number of requested processed.

Returns:

  • (Fixnum)

    number of request processed



194
195
196
# File 'lib/oflow/task.rb', line 194

def proc_count()
  @proc_cnt
end

#queue_countFixnum

Returns the number of requests on the queue.

Returns:

  • (Fixnum)

    number of queued requests



154
155
156
# File 'lib/oflow/task.rb', line 154

def queue_count()
  @queue.length
end

#receive(op, box) ⇒ Object

Creates a Request and adds it to the queue.

Parameters:

  • op (Symbol)

    operation to perform

  • box (Box)

    contents or data for the request

Raises:



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/oflow/task.rb', line 281

def receive(op, box)
  return if CLOSING == @state

  raise BlockedError.new() if BLOCKED == @state

  box = box.receive(full_name, op) unless box.nil?
  # Special case for starting state so that an Actor can place an item on
  # the queue before the loop is started.
  if @loop.nil? && STARTING != @state # no thread task
    begin
      @actor.perform(op, box)
    rescue Exception => e
      handle_error(e)
    end
    return
  end
  unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count
    @req_thread = Thread.current
    sleep(timeout) unless @req_timeout.nil? || 0 == @req_timeout
    @req_thread = nil
    raise BusyError.new() unless @queue.size() < @max_queue_count
  end
  @req_mutex.synchronize {
    @queue.insert(0, Request.new(op, box))
  }
  @loop.wakeup() if RUNNING == @state
end

#request_timeoutFloat

Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.

Returns:

  • (Float)

    current timeout for the receive() method



181
182
183
# File 'lib/oflow/task.rb', line 181

def request_timeout()
  @req_timeout
end

Resolves all links. Called by the system.



369
370
371
372
373
# File 'lib/oflow/task.rb', line 369

def resolve_all_links()
  @links.each_value { |lnk|
    set_link_target(lnk) if lnk.target.nil?
  }
end

#set_options(options) ⇒ Object

Processes the initialize() options. Subclasses should call super.

Parameters:

  • options (Hash)

    options to be used for initialization

Options Hash (options):

  • :max_queue_count (Fixnum)

    maximum number of requests that can be queued before backpressure is applied to the caller.

  • :request_timeout (Float)

    timeout in seconds to wait before raising a BusyError if the request queue is too long.



326
327
328
329
# File 'lib/oflow/task.rb', line 326

def set_options(options)
  @max_queue_count = options.fetch(:max_queue_count, @max_queue_count)
  @req_timeout = options.fetch(:request_timeout, @req_timeout).to_f
end

#ship(dest, box) ⇒ Object

Sends a message to another Task or Flow.

Parameters:

  • dest (Symbol)

    identifies the link that points to the destination Task or Flow

  • box (Box)

    contents or data for the request

Raises:



312
313
314
315
316
317
318
# File 'lib/oflow/task.rb', line 312

def ship(dest, box)
  box.freeze() unless box.nil?
  link = resolve_link(dest)
  raise LinkError.new(dest) if link.nil? || link.target.nil?
  link.target.receive(link.op, box)
  link
end

#shutdown(flush_first = false) ⇒ Object

Closes the Task by exiting the processing thread. If flush is true then all requests in the queue are processed first.



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/oflow/task.rb', line 232

def shutdown(flush_first=false)
  return if @loop.nil?
  if flush_first
    @state = BLOCKED
    flush()
  end
  @state = CLOSING
  begin
    # if the loop has already exited this will raise an Exception that can be ignored
    @loop.wakeup()
  rescue
    # ignore
  end
  @loop.join()
end

#startObject

Restarts the Task’s processing thread.



225
226
227
228
# File 'lib/oflow/task.rb', line 225

def start()
  @state = RUNNING
  @loop.wakeup() unless @loop.nil?
end

#state_stringString

Returns the state of the Task as a String.

Returns:

  • (String)

    String representation of the state



103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/oflow/task.rb', line 103

def state_string()
  ss = 'UNKNOWN'
  case @state
  when STOPPED
    ss = 'STOPPED'
  when RUNNING
    ss = 'RUNNING'
  when CLOSING
    ss = 'CLOSING'
  when STEP
    ss = 'STEP'
  end
  ss
end

#step(max_wait = 5) ⇒ Object

Causes the Task to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long.

Parameters:

  • max_wait (Float|Fixnum) (defaults to: 5)

    _wait maximum time to wait for the step to complete



207
208
209
210
211
212
213
214
215
216
# File 'lib/oflow/task.rb', line 207

def step(max_wait=5)
  return nil if @loop.nil?
  return nil if STOPPED != @state || @queue.empty?
  @state = STEP
  @step_thread = Thread.current
  @loop.wakeup()
  sleep(max_wait)
  @step_thread = nil
  self
end

#stopObject

Causes the Actor to stop processing any more requests after the current request has finished.



200
201
202
# File 'lib/oflow/task.rb', line 200

def stop()
  @state = STOPPED
end

#wakeupObject

Wakes up the Task if it has been stopped or if Env.shutdown() has been called.



219
220
221
222
# File 'lib/oflow/task.rb', line 219

def wakeup()
  # don't wake if the task is currently processing
  @loop.wakeup() unless @loop.nil?
end