Class: OFlow::Env
- Inherits:
-
Object
- Object
- OFlow::Env
- Includes:
- HasErrorHandler, HasLog
- Defined in:
- lib/oflow/env.rb
Overview
The platform that Flows are created in. It is the outer most element of the OFlow system.
Constant Summary collapse
- @@log_level =
Logger::WARN
Class Method Summary collapse
-
.log_level ⇒ Fixnum
Returns the default log level.
-
.log_level=(level) ⇒ Object
Sets the default log level.
Instance Method Summary collapse
-
#_clear ⇒ Object
Resets the error handler and log.
- #_locate(path) ⇒ Object
-
#_validation_errors ⇒ Object
Returns an Array of validation errors.
-
#busy? ⇒ true|false
Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.
-
#clear ⇒ Object
Clears out all Tasks and Flows and resets the object back to a empty state.
-
#describe(detail = 0, indent = 0) ⇒ Object
Describes all the Flows and Tasks in the system.
-
#each_flow(&blk) ⇒ Object
Iterates over each Flow and yields to the provided block with each Flow.
-
#error_handler ⇒ Task
Returns a error_handler Task if one is set on the instance.
-
#find_flow(name) ⇒ Flow|nil
Locates and return a Flow with the specified name.
-
#flow(name, &block) {|f| ... } ⇒ Flow
Creates a Flow and yield to a block with the newly create Flow.
-
#flow_count ⇒ Object
Returns the number of active Tasks.
-
#flush ⇒ Object
Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.
- #full_name ⇒ Object
-
#initialize(name = '') ⇒ Env
constructor
A new instance of Env.
-
#locate(name) ⇒ Flow|Task|nil
Locates and return a Task with the specified full name.
-
#log ⇒ Task
Returns a log Task if one is set on the instance.
- #prepare ⇒ Object
-
#queue_count ⇒ Fixnum
Returns the sum of all the requests in all the Flow’s Task’s queues.
-
#resolve_all_links ⇒ Object
Resolves all the Links on all the Flows being managed.
-
#shutdown(flush_first = false) ⇒ Object
Shuts down all Tasks.
-
#start ⇒ Object
Calls the start() method on all Tasks.
-
#state=(s) ⇒ Object
Sets the state of all Tasks recursively.
-
#step ⇒ Object
Calls the step() method one Task that is stopped and has an item in the queue.
-
#stop ⇒ Object
Calls the stop() method on all Tasks.
-
#validate ⇒ Object
Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.
-
#wakeup ⇒ Object
Wakes up all the Tasks in the Flow.
-
#walk_flows(&blk) ⇒ Object
Performs a recursive walk over all Flows and yields to the provided block for each.
-
#walk_tasks(&blk) ⇒ Object
Performs a recursive walk over all Tasks in all Flows and yields to the provided block for each.
Methods included from HasErrorHandler
#error_handler=, #handle_error
Methods included from HasLog
#debug, #debug?, #error, #error?, #fatal, #info, #info?, #log=, #log_msg, #warn, #warn?
Constructor Details
#initialize(name = '') ⇒ Env
Returns a new instance of Env.
26 27 28 29 30 31 32 33 |
# File 'lib/oflow/env.rb', line 26 def initialize(name='') # The default logging level. @flows = {} @prepared = false @name = name @log = nil _clear() end |
Class Method Details
.log_level ⇒ Fixnum
Returns the default log level.
15 16 17 |
# File 'lib/oflow/env.rb', line 15 def self.log_level() @@log_level end |
.log_level=(level) ⇒ Object
Sets the default log level.
21 22 23 24 |
# File 'lib/oflow/env.rb', line 21 def self.log_level=(level) @@log_level = level unless level < Logger::Severity::DEBUG || Logger::Severity::FATAL < level #@log.receive(:severity, Box.new(@log_level)) unless @log.nil? end |
Instance Method Details
#_clear ⇒ Object
Resets the error handler and log. Usually called on init and by the clear() method.
241 242 243 244 |
# File 'lib/oflow/env.rb', line 241 def _clear() @error_handler = Task.new(self, :error, Actors::ErrorHandler) @log = Task.new(self, :log, Actors::Log) end |
#_locate(path) ⇒ Object
137 138 139 140 141 |
# File 'lib/oflow/env.rb', line 137 def _locate(path) f = @flows[path[0].to_sym] return f if f.nil? || 1 == path.size f._locate(path[1..-1]) end |
#_validation_errors ⇒ Object
Returns an Array of validation errors.
82 83 84 85 86 |
# File 'lib/oflow/env.rb', line 82 def _validation_errors() errors = [] @flows.each_value { |f| errors += f._validation_errors() } errors end |
#busy? ⇒ true|false
Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it’s input queue.
159 160 161 162 163 164 |
# File 'lib/oflow/env.rb', line 159 def busy? @flows.each_value { |f| return true if f.busy? } return true if !@log.nil? && @log.busy? return true if !@error_handler.nil? && @error_handler.busy? false end |
#clear ⇒ Object
Clears out all Tasks and Flows and resets the object back to a empty state.
233 234 235 236 237 |
# File 'lib/oflow/env.rb', line 233 def clear() shutdown() @flows = {} _clear() end |
#describe(detail = 0, indent = 0) ⇒ Object
Describes all the Flows and Tasks in the system.
247 248 249 250 251 252 253 254 255 |
# File 'lib/oflow/env.rb', line 247 def describe(detail=0, indent=0) i = ' ' * indent lines = ["#{i}#{@name} (#{self.class.name}) {"] @flows.each_value { |f| lines << f.describe(detail, indent + 2) } lines << i + "}" lines.join("\n") end |
#each_flow(&blk) ⇒ Object
Iterates over each Flow and yields to the provided block with each Flow.
97 98 99 |
# File 'lib/oflow/env.rb', line 97 def each_flow(&blk) @flows.each { |name,flow| blk.yield(flow) } end |
#error_handler ⇒ Task
Returns a error_handler Task if one is set on the instance.
47 48 49 |
# File 'lib/oflow/env.rb', line 47 def error_handler() @error_handler end |
#find_flow(name) ⇒ Flow|nil
Locates and return a Flow with the specified name.
122 123 124 125 |
# File 'lib/oflow/env.rb', line 122 def find_flow(name) name = name.to_sym unless name.nil? @flows[name] end |
#flow(name, &block) {|f| ... } ⇒ Flow
Creates a Flow and yield to a block with the newly create Flow. Used to contruct Flows.
57 58 59 60 61 62 |
# File 'lib/oflow/env.rb', line 57 def flow(name, &block) f = Flow.new(self, name) @flows[f.name] = f yield(f) if block_given? f end |
#flow_count ⇒ Object
Returns the number of active Tasks.
144 145 146 |
# File 'lib/oflow/env.rb', line 144 def flow_count() @flows.size end |
#flush ⇒ Object
Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.
202 203 204 205 206 207 208 |
# File 'lib/oflow/env.rb', line 202 def flush() wakeup() @flows.each_value { |f| f.flush() } while busy? sleep(0.2) end end |
#full_name ⇒ Object
35 36 37 |
# File 'lib/oflow/env.rb', line 35 def full_name() @name end |
#locate(name) ⇒ Flow|Task|nil
Locates and return a Task with the specified full name.
130 131 132 133 134 135 |
# File 'lib/oflow/env.rb', line 130 def locate(name) name = name[1..-1] if name.start_with?(':') name = name[0..-2] if name.end_with?(':') path = name.split(':') _locate(path) end |
#log ⇒ Task
Returns a log Task if one is set on the instance.
41 42 43 |
# File 'lib/oflow/env.rb', line 41 def log() @log end |
#prepare ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/oflow/env.rb', line 64 def prepare() @flows.each_value { |f| f.resolve_all_links() } validate() @prepared = true end |
#queue_count ⇒ Fixnum
Returns the sum of all the requests in all the Flow’s Task’s queues.
150 151 152 153 154 |
# File 'lib/oflow/env.rb', line 150 def queue_count() cnt = 0 @flows.each_value { |f| cnt += f.queue_count() } cnt end |
#resolve_all_links ⇒ Object
Resolves all the Links on all the Flows being managed.
89 90 91 92 93 |
# File 'lib/oflow/env.rb', line 89 def resolve_all_links() @flows.each_value { |f| f.resolve_all_links() } end |
#shutdown(flush_first = false) ⇒ Object
Shuts down all Tasks.
220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/oflow/env.rb', line 220 def shutdown(flush_first=false) # block all tasks first so threads can empty queues @flows.each_value do |f| f.state = Task::BLOCKED end # shutdown and wait for queues to empty if necessary @flows.each_value do |f| f.shutdown(flush_first) end @flows = {} end |
#start ⇒ Object
Calls the start() method on all Tasks.
190 191 192 193 |
# File 'lib/oflow/env.rb', line 190 def start() prepare() unless @prepared @flows.each_value { |f| f.start() } end |
#state=(s) ⇒ Object
Sets the state of all Tasks recursively. This should not be called directly.
212 213 214 215 216 |
# File 'lib/oflow/env.rb', line 212 def state=(s) @flows.each_value do |f| f.state = s end end |
#step ⇒ Object
Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/oflow/env.rb', line 173 def step() max = 0.0 best = nil walk_tasks() do |t| if Task::STOPPED == t.state bu = t.backed_up() if max < bu best = t max = bu end end end best.step() unless best.nil? best end |
#stop ⇒ Object
Calls the stop() method on all Tasks.
167 168 169 |
# File 'lib/oflow/env.rb', line 167 def stop() @flows.each_value { |f| f.stop() } end |
#validate ⇒ Object
Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved.
75 76 77 78 79 |
# File 'lib/oflow/env.rb', line 75 def validate() # collects errors and raises all errors at once if there are any errors = _validation_errors() raise ValidateError.new(errors) unless errors.empty? end |
#wakeup ⇒ Object
Wakes up all the Tasks in the Flow.
196 197 198 |
# File 'lib/oflow/env.rb', line 196 def wakeup() @flows.each_value { |f| f.wakeup() } end |
#walk_flows(&blk) ⇒ Object
Performs a recursive walk over all Flows and yields to the provided block for each.
104 105 106 107 108 |
# File 'lib/oflow/env.rb', line 104 def walk_flows(&blk) @flows.each_value do |f| blk.yield(t) end end |
#walk_tasks(&blk) ⇒ Object
Performs a recursive walk over all Tasks in all Flows and yields to the provided block for each.
113 114 115 116 117 |
# File 'lib/oflow/env.rb', line 113 def walk_tasks(&blk) @flows.each_value do |f| f.walk_tasks(&blk) end end |