Class: OFlow::Flow
- Inherits:
-
Object
- Object
- OFlow::Flow
- Includes:
- HasErrorHandler, HasLog
- Defined in:
- lib/oflow/flow.rb
Overview
The Class used to managing interactions between Tasks and sub-Flows. It can be thought of as a container for Tasks where the Flow keeps track of the Links between the Tasks.
Instance Attribute Summary collapse
-
#env ⇒ Object
readonly
Returns the value of attribute env.
-
#name ⇒ Object
readonly
The name.
Instance Method Summary collapse
- #_clear ⇒ Object
- #_locate(path) ⇒ Object
-
#_validation_errors ⇒ Object
Returns an Array of validation errors.
-
#busy? ⇒ true|false
Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.
-
#clear ⇒ Object
Clears out all Tasks and Flows and resets the object back to a empty state.
-
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String describing the Flow.
-
#each_task(&blk) ⇒ Object
Iterates over each Task and yields to the provided block with each Task.
-
#error_handler ⇒ Task
Returns a error_handler Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows.
-
#find_task(name) ⇒ Task|nil
Locates and return a Task with the specified name.
-
#flush ⇒ Object
Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.
-
#full_name ⇒ String
Similar to a full file path.
-
#initialize(env, name) ⇒ Flow
constructor
Create a new Flow.
-
#locate(name) ⇒ Task|nil
Locates and return a Task with the specified full name.
-
#log ⇒ Task
Returns a log Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows.
-
#queue_count ⇒ Fixnum
Returns the sum of all the requests in all the Tasks’s queues.
-
#resolve_all_links ⇒ Object
Resolves all the Links on all the Tasks and Flows being managed as well as any Links in the instance itself.
-
#shutdown(flush_first = false) ⇒ Object
Shuts down all Tasks.
-
#start ⇒ Object
Calls the start() method on all Tasks.
-
#state=(s) ⇒ Object
Sets the state of all Tasks recursively.
-
#step ⇒ Object
Calls the step() method one Task that is stopped and has an item in the queue.
-
#stop ⇒ Object
Calls the stop() method on all Tasks.
-
#task(name, actor_class, options = {}, &block) {|t| ... } ⇒ Task
Creates a Task and yield to a block with the newly create Task.
-
#task_count ⇒ Object
Returns the number of active Tasks.
-
#validate ⇒ Object
Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.
-
#wakeup ⇒ Object
Wakes up all the Tasks in the Flow.
-
#walk_tasks(tasks_only = true, &blk) ⇒ Object
Performs a recursive walk over all Tasks and yields to the provided block for each.
Methods included from HasLog
#debug, #debug?, #error, #error?, #fatal, #info, #info?, #log=, #log_msg, #warn, #warn?
Methods included from HasErrorHandler
#error_handler=, #handle_error
Constructor Details
#initialize(env, name) ⇒ Flow
Create a new Flow.
19 20 21 22 23 24 25 26 |
# File 'lib/oflow/flow.rb', line 19 def initialize(env, name) @name = name.to_sym @tasks = {} @prepared = false @log = nil @error_handler = nil @env = env end |
Instance Attribute Details
#env ⇒ Object (readonly)
Returns the value of attribute env.
14 15 16 |
# File 'lib/oflow/flow.rb', line 14 def env @env end |
#name ⇒ Object (readonly)
The name.
13 14 15 |
# File 'lib/oflow/flow.rb', line 13 def name @name end |
Instance Method Details
#_clear ⇒ Object
251 252 |
# File 'lib/oflow/flow.rb', line 251 def _clear() end |
#_locate(path) ⇒ Object
138 139 140 141 142 |
# File 'lib/oflow/flow.rb', line 138 def _locate(path) t = @tasks[path[0].to_sym] return t if t.nil? || 1 == path.size t._locate(path[1..-1]) end |
#_validation_errors ⇒ Object
Returns an Array of validation errors.
84 85 86 87 88 |
# File 'lib/oflow/flow.rb', line 84 def _validation_errors() errors = [] @tasks.each_value { |t| errors += t._validation_errors() } errors end |
#busy? ⇒ true|false
Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.
160 161 162 163 |
# File 'lib/oflow/flow.rb', line 160 def busy? @tasks.each_value { |task| return true if task.busy? } false end |
#clear ⇒ Object
Clears out all Tasks and Flows and resets the object back to a empty state.
232 233 234 235 236 |
# File 'lib/oflow/flow.rb', line 232 def clear() shutdown() @tasks = {} _clear() end |
#describe(detail = 0, indent = 0) ⇒ Object
Returns a String describing the Flow.
241 242 243 244 245 246 247 248 249 |
# File 'lib/oflow/flow.rb', line 241 def describe(detail=0, indent=0) i = ' ' * indent lines = ["#{i}#{name} (#{self.class}) {"] @tasks.each_value { |t| lines << t.describe(detail, indent + 2) } lines << i + "}" lines.join("\n") end |
#each_task(&blk) ⇒ Object
Iterates over each Task and yields to the provided block with each Task.
101 102 103 |
# File 'lib/oflow/flow.rb', line 101 def each_task(&blk) @tasks.each { |name,task| blk.yield(task) } end |
#error_handler ⇒ Task
Returns a error_handler Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows.
48 49 50 51 52 53 |
# File 'lib/oflow/flow.rb', line 48 def error_handler() return @error_handler unless @error_handler.nil? eh = find_task(:error) return eh unless eh.nil? @env.error_handler end |
#find_task(name) ⇒ Task|nil
Locates and return a Task with the specified name.
123 124 125 126 |
# File 'lib/oflow/flow.rb', line 123 def find_task(name) name = name.to_sym unless name.nil? @tasks[name] end |
#flush ⇒ Object
Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.
201 202 203 204 205 206 207 |
# File 'lib/oflow/flow.rb', line 201 def flush() wakeup() @tasks.each_value { |t| t.flush() } while busy? sleep(0.2) end end |
#full_name ⇒ String
Similar to a full file path. The full_name described the containment of the named item.
31 32 33 |
# File 'lib/oflow/flow.rb', line 31 def full_name() @name.to_s end |
#locate(name) ⇒ Task|nil
Locates and return a Task with the specified full name.
131 132 133 134 135 136 |
# File 'lib/oflow/flow.rb', line 131 def locate(name) name = name[1..-1] if name.start_with?(':') name = name[0..-2] if name.end_with?(':') path = name.split(':') _locate(path) end |
#log ⇒ Task
Returns a log Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows.
38 39 40 41 42 43 |
# File 'lib/oflow/flow.rb', line 38 def log() return @log unless @log.nil? lg = find_task(:log) return lg unless lg.nil? @env.log end |
#queue_count ⇒ Fixnum
Returns the sum of all the requests in all the Tasks’s queues.
151 152 153 154 155 |
# File 'lib/oflow/flow.rb', line 151 def queue_count() cnt = 0 @tasks.each_value { |task| cnt += task.queue_count() } cnt end |
#resolve_all_links ⇒ Object
Resolves all the Links on all the Tasks and Flows being managed as well as any Links in the instance itself.
92 93 94 95 96 97 |
# File 'lib/oflow/flow.rb', line 92 def resolve_all_links() @tasks.each_value { |t| t.resolve_all_links() } @prepared = true end |
#shutdown(flush_first = false) ⇒ Object
Shuts down all Tasks.
219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/oflow/flow.rb', line 219 def shutdown(flush_first=false) # block all tasks first so threads can empty queues @tasks.each_value do |task| task.state = Task::BLOCKED end # shutdown and wait for queues to empty if necessary @tasks.each_value do |task| task.shutdown(flush_first) end @tasks = {} end |
#start ⇒ Object
Calls the start() method on all Tasks.
189 190 191 192 |
# File 'lib/oflow/flow.rb', line 189 def start() raise ValidateError.new("#{full_name} not validated.") unless @prepared @tasks.each_value { |task| task.start() } end |
#state=(s) ⇒ Object
Sets the state of all Tasks recursively. This should not be called directly.
211 212 213 214 215 |
# File 'lib/oflow/flow.rb', line 211 def state=(s) @tasks.each_value do |task| task.state = s end end |
#step ⇒ Object
Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/oflow/flow.rb', line 172 def step() max = 0.0 best = nil walk_tasks() do |t| if Task::STOPPED == t.state bu = t.backed_up() if max < bu best = t max = bu end end end best.step() unless best.nil? best end |
#stop ⇒ Object
Calls the stop() method on all Tasks.
166 167 168 |
# File 'lib/oflow/flow.rb', line 166 def stop() @tasks.each_value { |task| task.stop() } end |
#task(name, actor_class, options = {}, &block) {|t| ... } ⇒ Task
Creates a Task and yield to a block with the newly create Task. Used to configure Tasks.
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/oflow/flow.rb', line 62 def task(name, actor_class, ={}, &block) has_state = .has_key?(:state) unless has_state = .clone [:state] = Task::STOPPED end t = Task.new(self, name, actor_class, ) @tasks[t.name] = t yield(t) if block_given? t end |
#task_count ⇒ Object
Returns the number of active Tasks.
145 146 147 |
# File 'lib/oflow/flow.rb', line 145 def task_count() @tasks.size end |
#validate ⇒ Object
Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.
77 78 79 80 81 |
# File 'lib/oflow/flow.rb', line 77 def validate() # collects errors and raises all errors at once if there are any errors = _validation_errors() raise ValidateError.new(errors) unless errors.empty? end |
#wakeup ⇒ Object
Wakes up all the Tasks in the Flow.
195 196 197 |
# File 'lib/oflow/flow.rb', line 195 def wakeup() @tasks.each_value { |t| t.wakeup() } end |
#walk_tasks(tasks_only = true, &blk) ⇒ Object
Performs a recursive walk over all Tasks and yields to the provided block for each. Flows are followed recusively.
109 110 111 112 113 114 115 116 117 118 |
# File 'lib/oflow/flow.rb', line 109 def walk_tasks(tasks_only=true, &blk) @tasks.each_value do |t| if t.is_a?(Task) blk.yield(t) else blk.yield(t) unless tasks_only t.walk_tasks(tasks_only, &blk) end end end |