Class: Tap::App
Overview
App coordinates the setup and running of tasks, and provides an interface to the application directory structure. All tasks have an App (by default App.instance) through which tasks access access application-wide resources like the logger, executable queue, aggregator, and dependencies.
Running Tasks
Task enque command are forwarded to App#enq:
t0 = Task.intern {|task, input| "#{input}.0" }
t0.enq('a')
app.enq(t0, 'b')
app.run
app.results(t0) # => ['a.0', 'b.0']
When a task completes, the results will be passed to the task on_complete block, if set, or be collected into an Aggregator (aggregated results may be accessed per-task, as shown above); on_complete blocks typically execute or enque other tasks, allowing the construction of imperative workflows:
# clear the previous results
app.aggregator.clear
t1 = Task.intern {|task, input| "#{input}.1" }
t0.on_complete {|_result| t1.enq(_result) }
t0.enq 'c'
app.run
app.results(t0) # => []
app.results(t1) # => ['c.0.1']
Here t0 has no results because the on_complete block passed them to t1 in a simple sequence.
Dependencies
Tasks allow the construction of dependency-based workflows such that a dependent task only executes after its dependencies have been resolved.
runlist = []
t0 = Task.intern {|task| runlist << task }
t1 = Task.intern {|task| runlist << task }
t0.depends_on(t1)
t0.enq
app.run
runlist # => [t1, t0]
Once a dependency is resolved, it will not execute again:
t0.enq
app.run
runlist # => [t1, t0, t0]
Batching
Tasks can be batched, allowing the same input to be enqued to multiple tasks at once.
t0 = Task.intern {|task, input| "#{input}.0" }
t1 = Task.intern {|task, input| "#{input}.1" }
t0.batch_with(t1)
t0.enq 'a'
t1.enq 'b'
app.run
app.results(t0) # => ['a.0', 'b.0']
app.results(t1) # => ['a.1', 'b.1']
Executables
App can enque and run any Executable object. Arbitrary methods may be made into Executables using Object#_method. The mq (method enq) method generates and enques methods in one step.
array = []
# longhand
m = array._method(:push)
m.enq(1)
# shorthand
app.mq(array, :push, 2)
array.empty? # => true
app.run
array # => [1, 2]
Auditing
All results are audited to track how a given input evolves during a workflow. To illustrate auditing, consider and addition workflow that ends in eights.
add_one = Tap::Task.intern({}, 'add_one') {|task, input| input += 1 }
add_five = Tap::Task.intern({}, 'add_five') {|task, input| input += 5 }
add_one.on_complete do |_result|
# _result is the audit
current_value = _result.value
if current_value < 3
add_one.enq(_result)
else
add_five.enq(_result)
end
end
add_one.enq(0)
add_one.enq(1)
add_one.enq(2)
app.run
app.results(add_five) # => [8,8,8]
Although the results are indistinguishable, each achieved the final value through a different series of tasks. With auditing you can see how each input came to the final value of 8:
"\n" + Tap::Support::Audit.dump(app._results(add_five), "")
# => %Q{
# o-[] 2
# o-[add_one] 3
# o-[add_five] 8
#
# o-[] 1
# o-[add_one] 2
# o-[add_one] 3
# o-[add_five] 8
#
# o-[] 0
# o-[add_one] 1
# o-[add_one] 2
# o-[add_one] 3
# o-[add_five] 8
# }
See Tap::Support::Audit for more details.
Defined Under Namespace
Modules: State Classes: TerminateError
Constant Summary collapse
- DEFAULT_LOGGER =
The default App logger writes to $stdout at level INFO.
Logger.new($stdout)
Constants inherited from Root
Class Attribute Summary collapse
-
.instance ⇒ Object
Returns the current instance of App.
Instance Attribute Summary collapse
-
#aggregator ⇒ Object
readonly
A Tap::Support::Aggregator to collect the results of methods that have no on_complete block.
-
#dependencies ⇒ Object
readonly
A Tap::Support::Dependencies to track dependencies.
-
#logger ⇒ Object
The shared logger.
-
#queue ⇒ Object
readonly
The application queue.
-
#state ⇒ Object
readonly
The state of the application (see App::State).
Attributes inherited from Root
Instance Method Summary collapse
-
#_results(*tasks) ⇒ Object
Returns all aggregated, audited results for the specified tasks.
-
#config_filepath(name) ⇒ Object
Returns the configuration filepath for the specified task name, File.join(app, task_name + “.yml”).
-
#debug? ⇒ Boolean
True if debug or the global variable $DEBUG is true.
-
#enq(task, *inputs) ⇒ Object
Enques the task (or Executable) with the inputs.
-
#info ⇒ Object
Returns an information string for the App.
-
#initialize(config = {}, logger = DEFAULT_LOGGER) ⇒ App
constructor
Creates a new App with the given configuration.
-
#inspect ⇒ Object
Returns a string like: “#<Tap::App:#object_id root: #Top Level Namespace >”.
-
#log(action, msg = "", level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO).
-
#mq(object, method_name, *inputs) ⇒ Object
Method enque.
-
#ready ⇒ Object
Sets state = State::READY unless the app is running.
-
#results(*tasks) ⇒ Object
Returns all aggregated results for the specified tasks.
-
#run ⇒ Object
Sequentially calls execute with the [executable, inputs] pairs in queue; run continues until the queue is empty and then returns self.
-
#stop ⇒ Object
Signals a running application to stop executing tasks in the queue by setting state = State::STOP.
-
#terminate ⇒ Object
Signals a running application to terminate execution by setting state = State::TERMINATE.
Methods inherited from Root
#[], #[]=, #absolute_paths, #absolute_paths=, #chdir, chdir, empty?, exchange, expanded?, #filepath, glob, #glob, minimal_match?, minimize, path_root_type, prepare, #prepare, relative_filepath, #relative_filepath, #relative_paths=, #root=, sglob, split, #translate, translate, trivial?, vglob, #vglob
Methods included from Support::Versions
#compare_versions, #deversion, #increment, #version
Constructor Details
#initialize(config = {}, logger = DEFAULT_LOGGER) ⇒ App
Creates a new App with the given configuration.
202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/tap/app.rb', line 202 def initialize(config={}, logger=DEFAULT_LOGGER) super() @state = State::READY @queue = Support::ExecutableQueue.new @aggregator = Support::Aggregator.new @dependencies = Support::Dependencies.new initialize_config(config) self.logger = logger end |
Class Attribute Details
Instance Attribute Details
#aggregator ⇒ Object (readonly)
A Tap::Support::Aggregator to collect the results of methods that have no on_complete block
172 173 174 |
# File 'lib/tap/app.rb', line 172 def aggregator @aggregator end |
#dependencies ⇒ Object (readonly)
A Tap::Support::Dependencies to track dependencies.
175 176 177 |
# File 'lib/tap/app.rb', line 175 def dependencies @dependencies end |
#logger ⇒ Object
The shared logger
162 163 164 |
# File 'lib/tap/app.rb', line 162 def logger @logger end |
#queue ⇒ Object (readonly)
The application queue
165 166 167 |
# File 'lib/tap/app.rb', line 165 def queue @queue end |
#state ⇒ Object (readonly)
The state of the application (see App::State)
168 169 170 |
# File 'lib/tap/app.rb', line 168 def state @state end |
Instance Method Details
#_results(*tasks) ⇒ Object
Returns all aggregated, audited results for the specified tasks.
Results are joined into a single array. Arrays of tasks are allowed as inputs. See results.
350 351 352 |
# File 'lib/tap/app.rb', line 350 def _results(*tasks) aggregator.retrieve_all(*tasks.flatten) end |
#config_filepath(name) ⇒ Object
Returns the configuration filepath for the specified task name, File.join(app, task_name + “.yml”). Returns nil if task_name is nil.
245 246 247 |
# File 'lib/tap/app.rb', line 245 def config_filepath(name) name == nil ? nil : filepath('config', "#{name}.yml") end |
#debug? ⇒ Boolean
True if debug or the global variable $DEBUG is true.
222 223 224 |
# File 'lib/tap/app.rb', line 222 def debug? debug || $DEBUG end |
#enq(task, *inputs) ⇒ Object
Enques the task (or Executable) with the inputs. If the task is batched, then each task in task.batch will be enqued with the inputs. Returns task.
327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/tap/app.rb', line 327 def enq(task, *inputs) case task when Tap::Task raise ArgumentError, "not assigned to enqueing app: #{task}" unless task.app == self task.enq(*inputs) when Support::Executable queue.enq(task, inputs) else raise ArgumentError, "not a Task or Executable: #{task}" end task end |
#info ⇒ Object
321 322 323 |
# File 'lib/tap/app.rb', line 321 def info "state: #{state} (#{State.state_str(state)}) queue: #{queue.size} results: #{aggregator.size}" end |
#inspect ⇒ Object
Returns a string like: “#<Tap::App:#object_id root: #Top Level Namespace >”
374 375 376 |
# File 'lib/tap/app.rb', line 374 def inspect "#<#{self.class.to_s}:#{object_id} root: #{root} >" end |
#log(action, msg = "", level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO).
Logging is suppressed if quiet is true.
238 239 240 |
# File 'lib/tap/app.rb', line 238 def log(action, msg="", level=Logger::INFO) logger.add(level, msg, action.to_s) if !quiet || verbose end |
#mq(object, method_name, *inputs) ⇒ Object
Method enque. Enques the specified method from object with the inputs. Returns the enqued method.
342 343 344 345 |
# File 'lib/tap/app.rb', line 342 def mq(object, method_name, *inputs) m = object._method(method_name) enq(m, *inputs) end |
#ready ⇒ Object
Sets state = State::READY unless the app is running. Returns self.
250 251 252 253 |
# File 'lib/tap/app.rb', line 250 def ready @state = State::READY unless state == State::RUN self end |
#results(*tasks) ⇒ Object
Returns all aggregated results for the specified tasks. Results are joined into a single array. Arrays of tasks are allowed as inputs.
t0 = Task.intern {|task, input| "#{input}.0" }
t1 = Task.intern {|task, input| "#{input}.1" }
t2 = Task.intern {|task, input| "#{input}.2" }
t1.batch_with(t2)
t0.enq(0)
t1.enq(1)
app.run
app.results(t0, t1.batch) # => ["0.0", "1.1", "1.2"]
app.results(t1, t0) # => ["1.1", "0.0"]
369 370 371 |
# File 'lib/tap/app.rb', line 369 def results(*tasks) _results(tasks).collect {|_result| _result.value } end |
#run ⇒ Object
Sequentially calls execute with the [executable, inputs] pairs in queue; run continues until the queue is empty and then returns self.
Run State
Run checks the state of self before executing a method. If state changes from State::RUN, the following behaviors result:
- State::STOP
-
No more executables will be executed; the current executable will continute to completion.
- State::TERMINATE
-
No more executables will be executed and the currently running executable will be discontinued as described in terminate.
Calls to run when the state is not State::READY do nothing and return immediately.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/tap/app.rb', line 271 def run return self unless state == State::READY @state = State::RUN # TODO: log starting run begin until queue.empty? || state != State::RUN executable, inputs = queue.deq executable._execute(*inputs) end rescue(TerminateError) # gracefully fail for termination errors rescue(Exception) # handle other errors accordingly raise if debug? log($!.class, $!.) ensure @state = State::READY end # TODO: log run complete self end |
#stop ⇒ Object
Signals a running application to stop executing tasks in the queue by setting state = State::STOP. The task currently executing will continue uninterrupted to completion.
Does nothing unless state is State::RUN.
300 301 302 303 |
# File 'lib/tap/app.rb', line 300 def stop @state = State::STOP if state == State::RUN self end |
#terminate ⇒ Object
Signals a running application to terminate execution by setting state = State::TERMINATE. In this state, an executing task will then raise a TerminateError upon check_terminate, thus allowing the invocation of task-specific termination, perhaps performing rollbacks. (see Tap::Support::Executable#check_terminate).
Does nothing if state == State::READY.
312 313 314 315 |
# File 'lib/tap/app.rb', line 312 def terminate @state = State::TERMINATE unless state == State::READY self end |