Module: OFlow::HasTasks
Overview
Provides the ability to have Tasks and Flows.
Instance Method Summary collapse
- #_locate(path) ⇒ Object
-
#_validation_errors ⇒ Object
Returns an Array of validation errors.
-
#busy? ⇒ true|false
Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.
-
#clear ⇒ Object
Clears out all Tasks and Flows and resets the object back to a empty state.
-
#each_task(&blk) ⇒ Object
Iterates over each Task and yields to the provided block with each Task.
-
#find_task(name) ⇒ Task|nil
Locates and return a Task with the specified name.
-
#flow(name, options = {}, &block) {|f| ... } ⇒ Flow
Creates a Flow and yield to a block with the newly create Flow.
-
#flush ⇒ Object
Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.
-
#init_tasks ⇒ Object
Initializes the tasks attribute.
-
#locate(name) ⇒ Task|nil
Locates and return a Task with the specified full name.
-
#queue_count ⇒ Fixnum
Returns the sum of all the requests in all the Tasks’s queues.
-
#resolve_all_links ⇒ Object
Resolves all the Links on all the Tasks and Flows being managed as well as any Links in the instance itself.
-
#shutdown(flush_first = false) ⇒ Object
Shuts down all Tasks.
-
#start ⇒ Object
Calls the start() method on all Tasks.
-
#state=(s) ⇒ Object
Sets the state of all Tasks recursively.
-
#step ⇒ Object
Calls the step() method one Task that is stopped and has an item in the queue.
-
#stop ⇒ Object
Calls the stop() method on all Tasks.
-
#task(name, actor_class, options = {}, &block) {|t| ... } ⇒ Task
Creates a Task and yield to a block with the newly create Task.
-
#task_count ⇒ Object
Returns the number of active Tasks.
-
#validate ⇒ Object
Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.
-
#wakeup ⇒ Object
Wakes up all the Tasks in the Flow.
-
#walk_tasks(tasks_only = true, &blk) ⇒ Object
Performs a recursive walk over all Tasks and yields to the provided block for each.
Instance Method Details
#_locate(path) ⇒ Object
114 115 116 117 118 |
# File 'lib/oflow/hastasks.rb', line 114 def _locate(path) t = @tasks[path[0].to_sym] return t if t.nil? || 1 == path.size t._locate(path[1..-1]) end |
#_validation_errors ⇒ Object
Returns an Array of validation errors.
57 58 59 60 61 |
# File 'lib/oflow/hastasks.rb', line 57 def _validation_errors() errors = [] @tasks.each_value { |t| errors += t._validation_errors() } errors end |
#busy? ⇒ true|false
Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.
136 137 138 139 |
# File 'lib/oflow/hastasks.rb', line 136 def busy? @tasks.each_value { |task| return true if task.busy? } false end |
#clear ⇒ Object
Clears out all Tasks and Flows and resets the object back to a empty state.
207 208 209 210 211 |
# File 'lib/oflow/hastasks.rb', line 207 def clear() shutdown() @tasks = {} _clear() end |
#each_task(&blk) ⇒ Object
Iterates over each Task and yields to the provided block with each Task.
76 77 78 |
# File 'lib/oflow/hastasks.rb', line 76 def each_task(&blk) @tasks.each { |name,task| blk.yield(task) } end |
#find_task(name) ⇒ Task|nil
Locates and return a Task with the specified name.
98 99 100 101 102 |
# File 'lib/oflow/hastasks.rb', line 98 def find_task(name) name = name.to_sym unless name.nil? return self if :flow == name @tasks[name] end |
#flow(name, options = {}, &block) {|f| ... } ⇒ Flow
Creates a Flow and yield to a block with the newly create Flow. Used to contruct Flows.
18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/oflow/hastasks.rb', line 18 def flow(name, ={}, &block) f = Flow.new(self, name, ) @tasks[f.name] = f yield(f) if block_given? f.resolve_all_links() # Wait to validate until at the top so up-links don't fail validation. if Env == self f.validate() f.start() end f end |
#flush ⇒ Object
Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.
176 177 178 179 180 181 182 |
# File 'lib/oflow/hastasks.rb', line 176 def flush() wakeup() @tasks.each_value { |t| t.flush() } while busy? sleep(0.2) end end |
#init_tasks ⇒ Object
Initializes the tasks attribute.
8 9 10 |
# File 'lib/oflow/hastasks.rb', line 8 def init_tasks() @tasks = {} end |
#locate(name) ⇒ Task|nil
Locates and return a Task with the specified full name.
107 108 109 110 111 112 |
# File 'lib/oflow/hastasks.rb', line 107 def locate(name) name = name[1..-1] if name.start_with?(':') name = name[0..-2] if name.end_with?(':') path = name.split(':') _locate(path) end |
#queue_count ⇒ Fixnum
Returns the sum of all the requests in all the Tasks’s queues.
127 128 129 130 131 |
# File 'lib/oflow/hastasks.rb', line 127 def queue_count() cnt = 0 @tasks.each_value { |task| cnt += task.queue_count() } cnt end |
#resolve_all_links ⇒ Object
Resolves all the Links on all the Tasks and Flows being managed as well as any Links in the instance itself.
65 66 67 68 69 70 71 72 |
# File 'lib/oflow/hastasks.rb', line 65 def resolve_all_links() @links.each_value { |lnk| set_link_target(lnk) if lnk.target.nil? } @tasks.each_value { |t| t.resolve_all_links() } end |
#shutdown(flush_first = false) ⇒ Object
Shuts down all Tasks.
194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/oflow/hastasks.rb', line 194 def shutdown(flush_first=false) # block all tasks first so threads can empty queues @tasks.each_value do |task| task.state = Task::BLOCKED end # shutdown and wait for queues to empty if necessary @tasks.each_value do |task| task.shutdown(flush_first) end @tasks = {} end |
#start ⇒ Object
Calls the start() method on all Tasks.
165 166 167 |
# File 'lib/oflow/hastasks.rb', line 165 def start() @tasks.each_value { |task| task.start() } end |
#state=(s) ⇒ Object
Sets the state of all Tasks recursively. This should not be called directly.
186 187 188 189 190 |
# File 'lib/oflow/hastasks.rb', line 186 def state=(s) @tasks.each_value do |task| task.state = s end end |
#step ⇒ Object
Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/oflow/hastasks.rb', line 148 def step() max = 0.0 best = nil walk_tasks() do |t| if Task::STOPPED == t.state bu = t.backed_up() if max < bu best = t max = bu end end end best.step() unless best.nil? best end |
#stop ⇒ Object
Calls the stop() method on all Tasks.
142 143 144 |
# File 'lib/oflow/hastasks.rb', line 142 def stop() @tasks.each_value { |task| task.stop() } end |
#task(name, actor_class, options = {}, &block) {|t| ... } ⇒ Task
Creates a Task and yield to a block with the newly create Task. Used to configure Tasks.
38 39 40 41 42 43 44 45 |
# File 'lib/oflow/hastasks.rb', line 38 def task(name, actor_class, ={}, &block) has_state = .has_key?(:state) [:state] = Task::STOPPED unless has_state t = Task.new(self, name, actor_class, ) @tasks[t.name] = t yield(t) if block_given? t end |
#task_count ⇒ Object
Returns the number of active Tasks.
121 122 123 |
# File 'lib/oflow/hastasks.rb', line 121 def task_count() @tasks.size end |
#validate ⇒ Object
Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.
50 51 52 53 54 |
# File 'lib/oflow/hastasks.rb', line 50 def validate() # collects errors and raises all errors at once if there are any errors = _validation_errors() raise ValidateError.new(errors) unless errors.empty? end |
#wakeup ⇒ Object
Wakes up all the Tasks in the Flow.
170 171 172 |
# File 'lib/oflow/hastasks.rb', line 170 def wakeup() @tasks.each_value { |t| t.wakeup() } end |
#walk_tasks(tasks_only = true, &blk) ⇒ Object
Performs a recursive walk over all Tasks and yields to the provided block for each. Flows are followed recusively.
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/oflow/hastasks.rb', line 84 def walk_tasks(tasks_only=true, &blk) @tasks.each_value do |t| if t.is_a?(Task) blk.yield(t) else blk.yield(t) unless tasks_only t.walk_tasks(tasks_only, &blk) end end end |