Class: OFlow::Task

Inherits:
Object
  • Object
show all
Includes:
HasErrorHandler, HasLinks, HasLog, HasName
Defined in:
lib/oflow/task.rb

Overview

The Task class provides the asynchronous functionality for the system. Each Task has it’s own thread and receives requests as an operation and Box of data by the receive() method. The request is put on a queue and popped off one at a time and handed to an Actor associatesd with the Task.

Defined Under Namespace

Classes: Link, Request

Constant Summary collapse

STARTING =

value of @state that indicates the Task is being created.

0
STOPPED =

value of @state that indicates the Task is not currently processing requests

1
RUNNING =

value of @state that indicates the Task is currently ready to process requests

2
CLOSING =

value of @state that indicates the Task is shutting down

3
BLOCKED =

value of @state that indicates the Task is not receiving new requests.

4
STEP =

value of @state that indicates the Task is processing one request and will stop after that processing is complete

5

Instance Attribute Summary collapse

Attributes included from HasName

#name

Instance Method Summary collapse

Methods included from HasLog

#debug, #error, #fatal, #info, #log, #log=, #log_msg, #warn

Methods included from HasErrorHandler

#error_handler, #error_handler=, #handle_error

Methods included from HasLinks

#find_link, #has_links?, #init_links, #link, #links, #resolve_link, #set_link_target

Methods included from HasName

#full_name, #init_name

Constructor Details

#initialize(flow, name, actor_class, options = {}) ⇒ Task

A Task is initialized by specifying a class to create an instance of.

Parameters:

  • flow (Flow)

    Flow containing the Task

  • name (name)

    Task base name

  • actor_class (Class)

    _class Class of the Actor to create

  • options (Hash) (defaults to: {})

    additional options for the Task or Actor

Raises:

  • (Exception)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/oflow/task.rb', line 38

def initialize(flow, name, actor_class, options={})
  @state = STARTING
  @queue = []
  @req_mutex = Mutex.new()
  @req_thread = nil
  @step_thread = nil
  @waiting_thread = nil
  @req_timeout = 0.0
  @max_queue_count = nil
  @current_req = nil
  @proc_cnt = 0
  @loop = nil

  init_name(flow, name)
  init_links()
  set_options(options)

  info("Creating actor #{actor_class} with options #{options}.")
  @actor = actor_class.new(self, options)
  raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform)

  @state = options.fetch(:state, RUNNING)
  return unless @actor.with_own_thread()

  @loop = Thread.start(self) do |me|
    Thread.current[:name] = me.full_name()
    while CLOSING != @state
      begin
        if RUNNING == @state || STEP == @state || BLOCKED == @state
          req = nil
          if @queue.empty?
            @waiting_thread.wakeup() unless @waiting_thread.nil?
            sleep(1.0)
            next
          end
          @req_mutex.synchronize {
            req = @queue.pop()
          }
          @req_thread.wakeup() unless @req_thread.nil?

          @current_req = req
          begin
            info("perform(#{req.op}, #{req.box})")
            @actor.perform(req.op, req.box) unless req.nil?
          rescue Exception => e
            handle_error(e)
          end
          @proc_cnt += 1
          @current_req = nil
          if STEP == @state
            @step_thread.wakeup() unless @step_thread.nil?
            @state = STOPPED
          end
        elsif STOPPED == @state
          sleep(1.0)
        end
      rescue Exception => e
        puts "*** #{full_name} #{e.class}: #{e.message}"
        @current_req = nil
        # TBD Env.rescue(e)
      end
    end
  end
end

Instance Attribute Details

#actorObject (readonly)

the Actor



31
32
33
# File 'lib/oflow/task.rb', line 31

def actor
  @actor
end

#stateObject

The current processing state of the Task



29
30
31
# File 'lib/oflow/task.rb', line 29

def state
  @state
end

Instance Method Details



358
359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/oflow/task.rb', line 358

def _check_link(lnk, errors)
  if lnk.target.nil?
    errors << ValidateError::Problem.new(full_name, ValidateError::Problem::LINK_ERROR, "Failed to find task '#{lnk.target_name}'.")
    return
  end
  unless lnk.target.has_input(lnk.op)
    errors << ValidateError::Problem.new(full_name, ValidateError::Problem::INPUT_ERROR, "'#{lnk.op}' not allowed on '#{lnk.target.full_name}'.")
    return
  end

  # TBD
  # Verify target has link.op as input if input is specified.

end

#_validation_errorsObject



336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/oflow/task.rb', line 336

def _validation_errors()
  errors = []
  @links.each_value { |lnk| _check_link(lnk, errors) }

  unless (outs = @actor.outputs()).nil?
    outs.each do |spec|
      if find_link(spec.dest).nil?
        errors << ValidateError::Problem.new(full_name, ValidateError::Problem::MISSING_ERROR, "Missing link for '#{spec.dest}'.")
      end
    end
  end
  errors
end

#backed_upFixnum

Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector.

Returns:

  • (Fixnum)

    a measure of how backed up a Task is



163
164
165
166
167
168
169
170
171
172
# File 'lib/oflow/task.rb', line 163

def backed_up()
  cnt = @queue.size() + (@current_req.nil? ? 0 : 1)
  return 0 if 0 == cnt
  if @max_queue_count.nil? || 0 == @max_queue_count
    cnt = 80 if 80 < cnt
    cnt
  else
    cnt * 100 / @max_queue_count
  end
end

#busy?true|false

Returns the true if any requests are queued or a request is being processed.

Returns:

  • (true|false)

    true if busy, false otherwise



176
177
178
# File 'lib/oflow/task.rb', line 176

def busy?()
  !@current_req.nil? || !@queue.empty?
end

#describe(detail = 0, indent = 0) ⇒ Object

Returns a String that describes the Task.

Parameters:

  • detail (Fixnum) (defaults to: 0)

    higher values result in more detail in the description

  • indent (Fixnum) (defaults to: 0)

    the number of spaces to indent the description



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/oflow/task.rb', line 123

def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{name} (#{actor.class}) {"]
  @links.each { |local,link|
    lines << "  #{i}#{local} => #{link.target_name}:#{link.op}"
  }
  if 1 <= detail
    lines << "  #{i}queued: #{queue_count()} (#{busy? ? 'busy' : 'idle'})"
    lines << "  #{i}state: #{state_string()}"
    @actor.options.each do |key,value|
      lines << "  #{i}#{key} = #{value} (#{value.class})"
    end
    if 2 <= detail
      lines << "  #{i}processing: #{@current_req.describe(detail)}" unless @current_req.nil?
      # Copy queue so it doesn't change while working with it and don't
      # block the queue. It is okay for the requests to be stale as it is
      # for a display that will be out of date as soon as it is displayed.
      reqs = []
      @req_mutex.synchronize {
        reqs = Array.new(@queue)
      }
      lines << "  #{i}queue:"
      reqs.each do |req|
        lines << "    #{i}#{req.describe(detail)}"
      end
    end
  end
  lines << i + "}"
  lines.join("\n")
end

#flushObject

Waits for all processing to complete before returning.



251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/oflow/task.rb', line 251

def flush()
  return if @loop.nil?
  @waiting_thread = Thread.current
  begin
    @loop.wakeup()
  rescue
    # ignore
  end
  while busy?
    sleep(2.0)
  end
  @waiting_thread = nil
end

#has_input(op) ⇒ Object



350
351
352
353
354
355
356
# File 'lib/oflow/task.rb', line 350

def has_input(op)
  ins = @actor.inputs()
  return true if ins.nil?
  op = op.to_sym unless op.nil?
  ins.each { |spec| return true if spec.op.nil? || spec.op == op }
  false
end

#inputsObject

The expected inputs the Task supports or nil if not implemented.



271
272
273
# File 'lib/oflow/task.rb', line 271

def inputs()
  @actor.inputs()
end

#max_queue_countNilClass|Fixnum

Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit.

Returns:

  • (NilClass|Fixnum)

    maximum number of request that can be queued



190
191
192
# File 'lib/oflow/task.rb', line 190

def max_queue_count()
  @max_queue_count
end

#outputsObject

The expected outputs the Task supports or nil if not implemented.



276
277
278
# File 'lib/oflow/task.rb', line 276

def outputs()
  @actor.outputs()
end

#proc_countFixnum

Returns the total number of requested processed.

Returns:

  • (Fixnum)

    number of request processed



196
197
198
# File 'lib/oflow/task.rb', line 196

def proc_count()
  @proc_cnt
end

#queue_countFixnum

Returns the number of requests on the queue.

Returns:

  • (Fixnum)

    number of queued requests



156
157
158
# File 'lib/oflow/task.rb', line 156

def queue_count()
  @queue.length
end

#receive(op, box) ⇒ Object

Creates a Request and adds it to the queue.

Parameters:

  • op (Symbol)

    operation to perform

  • box (Box)

    contents or data for the request

Raises:



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/oflow/task.rb', line 283

def receive(op, box)
  info("receive(#{op}, #{box}) #{state_string}")

  return if CLOSING == @state

  raise BlockedError.new() if BLOCKED == @state

  box = box.receive(full_name, op) unless box.nil?
  # Special case for starting state so that an Actor can place an item on
  # the queue before the loop is started.
  if @loop.nil? && STARTING != @state # no thread task
    begin
      @actor.perform(op, box)
    rescue Exception => e
      handle_error(e)
    end
    return
  end
  unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count
    @req_thread = Thread.current
    sleep(timeout) unless @req_timeout.nil? || 0 == @req_timeout
    @req_thread = nil
    raise BusyError.new() unless @queue.size() < @max_queue_count
  end
  @req_mutex.synchronize {
    @queue.insert(0, Request.new(op, box))
  }
  @loop.wakeup() if RUNNING == @state
end

#request_timeoutFloat

Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method.

Returns:

  • (Float)

    current timeout for the receive() method



183
184
185
# File 'lib/oflow/task.rb', line 183

def request_timeout()
  @req_timeout
end

Resolves all links. Called by the system.



374
375
376
377
378
# File 'lib/oflow/task.rb', line 374

def resolve_all_links()
  @links.each_value { |lnk|
    set_link_target(lnk) if lnk.target.nil?
  }
end

#set_options(options) ⇒ Object

Processes the initialize() options. Subclasses should call super.

Parameters:

  • options (Hash)

    options to be used for initialization

Options Hash (options):

  • :max_queue_count (Fixnum)

    maximum number of requests that can be queued before backpressure is applied to the caller.

  • :request_timeout (Float)

    timeout in seconds to wait before raising a BusyError if the request queue is too long.



331
332
333
334
# File 'lib/oflow/task.rb', line 331

def set_options(options)
  @max_queue_count = options.fetch(:max_queue_count, @max_queue_count)
  @req_timeout = options.fetch(:request_timeout, @req_timeout).to_f
end

#ship(dest, box) ⇒ Object

Sends a message to another Task or Flow.

Parameters:

  • dest (Symbol)

    identifies the link that points to the destination Task or Flow

  • box (Box)

    contents or data for the request

Raises:



316
317
318
319
320
321
322
323
# File 'lib/oflow/task.rb', line 316

def ship(dest, box)
  box.freeze() unless box.nil?
  link = resolve_link(dest)
  raise LinkError.new(dest) if link.nil? || link.target.nil?
  info("shipping #{box} to #{link.target_name}:#{link.op}")
  link.target.receive(link.op, box)
  link
end

#shutdown(flush_first = false) ⇒ Object

Closes the Task by exiting the processing thread. If flush is true then all requests in the queue are processed first.



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/oflow/task.rb', line 234

def shutdown(flush_first=false)
  return if @loop.nil?
  if flush_first
    @state = BLOCKED
    flush()
  end
  @state = CLOSING
  begin
    # if the loop has already exited this will raise an Exception that can be ignored
    @loop.wakeup()
  rescue
    # ignore
  end
  @loop.join()
end

#startObject

Restarts the Task’s processing thread.



227
228
229
230
# File 'lib/oflow/task.rb', line 227

def start()
  @state = RUNNING
  @loop.wakeup() unless @loop.nil?
end

#state_stringString

Returns the state of the Task as a String.

Returns:

  • (String)

    String representation of the state



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/oflow/task.rb', line 105

def state_string()
  ss = 'UNKNOWN'
  case @state
  when STOPPED
    ss = 'STOPPED'
  when RUNNING
    ss = 'RUNNING'
  when CLOSING
    ss = 'CLOSING'
  when STEP
    ss = 'STEP'
  end
  ss
end

#step(max_wait = 5) ⇒ Object

Causes the Task to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long.

Parameters:

  • max_wait (Float|Fixnum) (defaults to: 5)

    _wait maximum time to wait for the step to complete



209
210
211
212
213
214
215
216
217
218
# File 'lib/oflow/task.rb', line 209

def step(max_wait=5)
  return nil if @loop.nil?
  return nil if STOPPED != @state || @queue.empty?
  @state = STEP
  @step_thread = Thread.current
  @loop.wakeup()
  sleep(max_wait)
  @step_thread = nil
  self
end

#stopObject

Causes the Actor to stop processing any more requests after the current request has finished.



202
203
204
# File 'lib/oflow/task.rb', line 202

def stop()
  @state = STOPPED
end

#wakeupObject

Wakes up the Task if it has been stopped or if Env.shutdown() has been called.



221
222
223
224
# File 'lib/oflow/task.rb', line 221

def wakeup()
  # don't wake if the task is currently processing
  @loop.wakeup() unless @loop.nil?
end