Class: OFlow::Flow

Inherits:
Object
  • Object
show all
Includes:
HasErrorHandler, HasLog
Defined in:
lib/oflow/flow.rb

Overview

The Class used to managing interactions between Tasks and sub-Flows. It can be thought of as a container for Tasks where the Flow keeps track of the Links between the Tasks.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from HasLog

#debug, #debug?, #error, #error?, #fatal, #info, #info?, #log=, #log_msg, #warn, #warn?

Methods included from HasErrorHandler

#error_handler=, #handle_error

Constructor Details

#initialize(env, name) ⇒ Flow

Create a new Flow.

Parameters:

  • env (Env)

    Env containing the Flow

  • name (name)

    Flow base name



19
20
21
22
23
24
25
26
# File 'lib/oflow/flow.rb', line 19

def initialize(env, name)
  @name = name.to_sym
  @tasks = {}
  @prepared = false
  @log = nil
  @error_handler = nil
  @env = env
end

Instance Attribute Details

#envObject (readonly)

Returns the value of attribute env.



14
15
16
# File 'lib/oflow/flow.rb', line 14

def env
  @env
end

#nameObject (readonly)

The name.



13
14
15
# File 'lib/oflow/flow.rb', line 13

def name
  @name
end

Instance Method Details

#_clearObject



251
252
# File 'lib/oflow/flow.rb', line 251

def _clear()
end

#_locate(path) ⇒ Object



138
139
140
141
142
# File 'lib/oflow/flow.rb', line 138

def _locate(path)
  t = @tasks[path[0].to_sym]
  return t if t.nil? || 1 == path.size
  t._locate(path[1..-1])
end

#_validation_errorsObject

Returns an Array of validation errors.



84
85
86
87
88
# File 'lib/oflow/flow.rb', line 84

def _validation_errors()
  errors = []
  @tasks.each_value { |t| errors += t._validation_errors() }
  errors
end

#busy?true|false

Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.

Returns:

  • (true|false)

    the busy state across all Tasks



160
161
162
163
# File 'lib/oflow/flow.rb', line 160

def busy?
  @tasks.each_value { |task| return true if task.busy? }
  false
end

#clearObject

Clears out all Tasks and Flows and resets the object back to a empty state.



232
233
234
235
236
# File 'lib/oflow/flow.rb', line 232

def clear()
  shutdown()
  @tasks = {}
  _clear()
end

#describe(detail = 0, indent = 0) ⇒ Object

Returns a String describing the Flow.

Parameters:

  • detail (Fixnum) (defaults to: 0)

    higher values result in more detail in the description

  • indent (Fixnum) (defaults to: 0)

    the number of spaces to indent the description



241
242
243
244
245
246
247
248
249
# File 'lib/oflow/flow.rb', line 241

def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{name} (#{self.class}) {"]
  @tasks.each_value { |t|
    lines << t.describe(detail, indent + 2)
  }
  lines << i + "}"
  lines.join("\n")
end

#each_task(&blk) ⇒ Object

Iterates over each Task and yields to the provided block with each Task.

Parameters:

  • blk (Proc)

    Proc to call on each iteration



101
102
103
# File 'lib/oflow/flow.rb', line 101

def each_task(&blk)
  @tasks.each { |name,task| blk.yield(task) }
end

#error_handlerTask

Returns a error_handler Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows.

Returns:

  • (Task)

    error_handler Task.



48
49
50
51
52
53
# File 'lib/oflow/flow.rb', line 48

def error_handler()
  return @error_handler unless @error_handler.nil?
  eh = find_task(:error)
  return eh unless eh.nil?
  @env.error_handler
end

#find_task(name) ⇒ Task|nil

Locates and return a Task with the specified name.

Parameters:

  • name (String)

    name of the Task

Returns:

  • (Task|nil)

    the Task with the name specified or nil



123
124
125
126
# File 'lib/oflow/flow.rb', line 123

def find_task(name)
  name = name.to_sym unless name.nil?
  @tasks[name]
end

#flushObject

Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.



201
202
203
204
205
206
207
# File 'lib/oflow/flow.rb', line 201

def flush()
  wakeup()
  @tasks.each_value { |t| t.flush() }
  while busy?
    sleep(0.2)
  end
end

#full_nameString

Similar to a full file path. The full_name described the containment of the named item.

Returns:

  • (String)

    full name of item



31
32
33
# File 'lib/oflow/flow.rb', line 31

def full_name()
  @name.to_s
end

#locate(name) ⇒ Task|nil

Locates and return a Task with the specified full name.

Parameters:

  • name (String)

    full name of the Task

Returns:

  • (Task|nil)

    the Task with the name specified or nil



131
132
133
134
135
136
# File 'lib/oflow/flow.rb', line 131

def locate(name)
  name = name[1..-1] if name.start_with?(':')
  name = name[0..-2] if name.end_with?(':')
  path = name.split(':')
  _locate(path)
end

#logTask

Returns a log Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows.

Returns:

  • (Task)

    log Task.



38
39
40
41
42
43
# File 'lib/oflow/flow.rb', line 38

def log()
  return @log unless @log.nil?
  lg = find_task(:log)
  return lg unless lg.nil?
  @env.log
end

#queue_countFixnum

Returns the sum of all the requests in all the Tasks’s queues.

Returns:

  • (Fixnum)

    total number of items waiting to be processed



151
152
153
154
155
# File 'lib/oflow/flow.rb', line 151

def queue_count()
  cnt = 0
  @tasks.each_value { |task| cnt += task.queue_count() }
  cnt
end

Resolves all the Links on all the Tasks and Flows being managed as well as any Links in the instance itself.



92
93
94
95
96
97
# File 'lib/oflow/flow.rb', line 92

def resolve_all_links()
  @tasks.each_value { |t|
    t.resolve_all_links()
  }
  @prepared = true
end

#shutdown(flush_first = false) ⇒ Object

Shuts down all Tasks.

Parameters:

  • flush_first (true|false) (defaults to: false)

    flag indicating shutdown should occur after the system becomes idle



219
220
221
222
223
224
225
226
227
228
229
# File 'lib/oflow/flow.rb', line 219

def shutdown(flush_first=false)
  # block all tasks first so threads can empty queues
  @tasks.each_value do |task|
    task.state = Task::BLOCKED
  end
  # shutdown and wait for queues to empty if necessary
  @tasks.each_value do |task|
    task.shutdown(flush_first)
  end
  @tasks = {}
end

#startObject

Calls the start() method on all Tasks.

Raises:



189
190
191
192
# File 'lib/oflow/flow.rb', line 189

def start()
  raise ValidateError.new("#{full_name} not validated.") unless @prepared
  @tasks.each_value { |task| task.start() }
end

#state=(s) ⇒ Object

Sets the state of all Tasks recursively. This should not be called directly.



211
212
213
214
215
# File 'lib/oflow/flow.rb', line 211

def state=(s)
  @tasks.each_value do |task|
    task.state = s
  end
end

#stepObject

Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/oflow/flow.rb', line 172

def step()
  max = 0.0
  best = nil
  walk_tasks() do |t|
    if Task::STOPPED == t.state
      bu = t.backed_up()
      if max < bu
        best = t
        max = bu
      end
    end
  end
  best.step() unless best.nil?
  best
end

#stopObject

Calls the stop() method on all Tasks.



166
167
168
# File 'lib/oflow/flow.rb', line 166

def stop()
  @tasks.each_value { |task| task.stop() }
end

#task(name, actor_class, options = {}, &block) {|t| ... } ⇒ Task

Creates a Task and yield to a block with the newly create Task. Used to configure Tasks.

Parameters:

  • name (Symbol|String)

    base name for the Task

  • actor_class (Class)

    Class to create an Actor instance of

  • options (Hash) (defaults to: {})

    optional parameters

  • block (Proc)

    block to yield to with the new Task instance

Yields:

  • (t)

Returns:

  • (Task)

    new Task



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/oflow/flow.rb', line 62

def task(name, actor_class, options={}, &block)
  has_state = options.has_key?(:state)
  unless has_state
    options = options.clone
    options[:state] = Task::STOPPED
  end
  t = Task.new(self, name, actor_class, options)
  @tasks[t.name] = t
  yield(t) if block_given?
  t
end

#task_countObject

Returns the number of active Tasks.



145
146
147
# File 'lib/oflow/flow.rb', line 145

def task_count()
  @tasks.size
end

#validateObject

Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.

Raises:



77
78
79
80
81
# File 'lib/oflow/flow.rb', line 77

def validate()
  # collects errors and raises all errors at once if there are any
  errors = _validation_errors()
  raise ValidateError.new(errors) unless errors.empty?
end

#wakeupObject

Wakes up all the Tasks in the Flow.



195
196
197
# File 'lib/oflow/flow.rb', line 195

def wakeup()
  @tasks.each_value { |t| t.wakeup() }
end

#walk_tasks(tasks_only = true, &blk) ⇒ Object

Performs a recursive walk over all Tasks and yields to the provided block for each. Flows are followed recusively.

Parameters:

  • tasks_only (true|false) (defaults to: true)

    indicates on Tasks and not Flows are yielded to

  • blk (Proc)

    Proc to call on each iteration



109
110
111
112
113
114
115
116
117
118
# File 'lib/oflow/flow.rb', line 109

def walk_tasks(tasks_only=true, &blk)
  @tasks.each_value do |t|
    if t.is_a?(Task)
      blk.yield(t)
    else
      blk.yield(t) unless tasks_only
      t.walk_tasks(tasks_only, &blk)
    end
  end
end