Class: Tap::App
- Inherits:
-
Object
- Object
- Tap::App
- Defined in:
- lib/tap/app.rb,
lib/tap/join.rb,
lib/tap/task.rb,
lib/tap/app/api.rb,
lib/tap/app/node.rb,
lib/tap/app/queue.rb,
lib/tap/app/stack.rb,
lib/tap/app/state.rb
Overview
:startdoc::app
App coordinates the setup and execution of workflows.
Defined Under Namespace
Modules: Node, State Classes: Api, Queue, Stack, TerminateError
Constant Summary collapse
- CALL_KEYS =
The reserved call keys
%w{obj sig args}
- INIT_KEYS =
The reserved init keys
%w{var class spec}
- RESERVED_KEYS =
Reserved call and init keys as a single array
CALL_KEYS + INIT_KEYS
- DEFAULT_LOGGER =
The default App logger (writes to $stderr at level INFO)
Logger.new($stderr)
Class Attribute Summary collapse
-
.instance(auto_initialize = true) ⇒ Object
Returns the current instance of App.
Instance Attribute Summary collapse
-
#logger ⇒ Object
The application logger.
-
#objects ⇒ Object
readonly
A cache of application objects.
-
#queue ⇒ Object
readonly
The application queue.
-
#stack ⇒ Object
The application call stack for executing nodes.
-
#state ⇒ Object
readonly
The state of the application (see App::State).
Attributes included from Node
Class Method Summary collapse
- .build(spec = {}, app = nil) ⇒ Object
-
.setup(dir = Dir.pwd) ⇒ Object
Sets up and returns App.instance with an Env setup to the specified directory.
Instance Method Summary collapse
-
#bq(*inputs, &block) ⇒ Object
Generates a node from the block and enques.
- #build(spec) ⇒ Object
-
#call(args, &block) ⇒ Object
Sends a signal to an application object.
-
#check_terminate ⇒ Object
Raises a TerminateError if state is TERMINATE.
-
#debug? ⇒ Boolean
True if the debug config or the global variable $DEBUG is true.
-
#dispatch(node, inputs = []) ⇒ Object
Dispatch does the following in order:.
-
#dump(target = $stdout, options = {}) ⇒ Object
Dumps self to the target as YAML.
-
#enq(node, *inputs) ⇒ Object
Enques the node with the inputs.
-
#enque(var, *args) ⇒ Object
Enques the application object specified by var with args.
-
#env=(env) ⇒ Object
Sets the application environment and validates that env provides an AGET ([]) and invert method.
-
#execute(node, *inputs) ⇒ Object
Execute is a wrapper for dispatch allowing inputs to be listed out rather than provided as an array.
-
#gc(all = false) ⇒ Object
Removes objects keyed by integers.
-
#get(var) ⇒ Object
Returns the object set to var, or self if var is nil.
-
#info ⇒ Object
Returns an information string for the App.
-
#initialize(config = {}, options = {}, &block) ⇒ App
constructor
Creates a new App with the given configuration.
- #inspect ⇒ Object
-
#join(inputs, outputs, config = {}, klass = Join, &block) ⇒ Object
Generates a join between the inputs and outputs.
-
#log(action, msg = nil, level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO).
-
#middleware ⇒ Object
Returns an array of middlware in use by self.
-
#node(&block) ⇒ Object
Returns a new node that executes block on call.
-
#obj(var) ⇒ Object
Same as get, but raises an error if no object is set to the variable.
-
#parse(argv, &block) ⇒ Object
:yields: spec.
-
#parse!(argv, &block) ⇒ Object
:yields: spec.
-
#reset ⇒ Object
Clears objects, the queue, and resets the stack so that no middleware is used.
- #resolve(const_str) ⇒ Object
- #route(obj, sig, &block) ⇒ Object
-
#run ⇒ Object
Sequentially dispatches each enqued (node, inputs) pair to the application stack.
-
#serialize(bare = true) ⇒ Object
Converts the self to a schema that can be used to build a new app with equivalent application objects, queue, and middleware.
-
#set(var, obj) ⇒ Object
Sets the object to the specified variable and returns obj.
-
#stop ⇒ Object
Signals a running app to stop dispatching nodes to the application stack by setting state to STOP.
-
#task(config = {}, klass = Task, &block) ⇒ Object
Generates a task with the specified config, initialized to self.
-
#terminate ⇒ Object
Signals a running application to terminate execution by setting state to TERMINATE.
- #to_spec ⇒ Object
-
#use(middleware, *argv) ⇒ Object
Adds the specified middleware to the stack.
-
#var(obj, auto_assign = true) ⇒ Object
Returns the variable for the object.
Methods included from Node
extended, intern, #on_complete
Methods included from Signals
Methods included from Signals::ModuleMethods
Constructor Details
#initialize(config = {}, options = {}, &block) ⇒ App
Creates a new App with the given configuration. Options can be used to specify objects that are normally initialized for every new app:
:stack the application stack; an App::Stack
:queue the application queue; an App::Queue
:objects application objects; a hash of (var, object) pairs
:logger the application logger
A block may also be provided; it will be set as a default join.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/tap/app.rb', line 155 def initialize(config={}, ={}, &block) super() # monitor @state = State::READY @stack = [:stack] || Stack.new(self) @queue = [:queue] || Queue.new @objects = [:objects] || {} @logger = [:logger] || DEFAULT_LOGGER @joins = [] on_complete(&block) self.env = config.delete(:env) || config.delete('env') initialize_config(config) end |
Class Attribute Details
.instance(auto_initialize = true) ⇒ Object
Returns the current instance of App. If no instance has been set, then instance initializes a new App with the default configuration. Instance is used to initialize tasks when no app is specified and exists for convenience only.
24 25 26 |
# File 'lib/tap/app.rb', line 24 def instance(auto_initialize=true) @instance ||= (auto_initialize ? new : nil) end |
Instance Attribute Details
#logger ⇒ Object
The application logger
89 90 91 |
# File 'lib/tap/app.rb', line 89 def logger @logger end |
#objects ⇒ Object (readonly)
A cache of application objects
86 87 88 |
# File 'lib/tap/app.rb', line 86 def objects @objects end |
#queue ⇒ Object (readonly)
The application queue
83 84 85 |
# File 'lib/tap/app.rb', line 83 def queue @queue end |
#stack ⇒ Object
The application call stack for executing nodes
80 81 82 |
# File 'lib/tap/app.rb', line 80 def stack @stack end |
#state ⇒ Object (readonly)
The state of the application (see App::State)
77 78 79 |
# File 'lib/tap/app.rb', line 77 def state @state end |
Class Method Details
.build(spec = {}, app = nil) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/tap/app.rb', line 36 def build(spec={}, app=nil) config = spec['config'] || {} signals = spec['signals'] || [] if spec['self'] app.reconfigure(config) else app = new(config) end signals.each do |args| app.call(args) end app.gc app end |
.setup(dir = Dir.pwd) ⇒ Object
Sets up and returns App.instance with an Env setup to the specified directory. This method is used to initialize the app and env as seen by the tap executable.
31 32 33 34 |
# File 'lib/tap/app.rb', line 31 def setup(dir=Dir.pwd) env = Env.setup(dir) @instance = new(:env => env) end |
Instance Method Details
#bq(*inputs, &block) ⇒ Object
Generates a node from the block and enques. Returns the new node.
236 237 238 239 240 |
# File 'lib/tap/app.rb', line 236 def bq(*inputs, &block) # :yields: *inputs node = self.node(&block) queue.enq(node, inputs) node end |
#build(spec) ⇒ Object
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/tap/app.rb', line 350 def build(spec) var = spec['var'] clas = spec['class'] spec = spec['spec'] || spec obj = nil if clas.nil? unless spec.empty? raise "no class specified" end else clas = resolve(clas) case spec when Array parse = bang ? :parse! : :parse obj, args = clas.send(parse, spec, self) if block_given? yield(obj, args) else warn_ignored_args(args) end when Hash obj = clas.build(spec, self) else raise "invalid spec: #{spec.inspect}" end end unless var.nil? if var.respond_to?(:each) var.each {|v| set(v, obj) } else set(var, obj) end end obj end |
#call(args, &block) ⇒ Object
Sends a signal to an application object. The input should be a hash defining these fields:
obj # a variable identifying an object, or nil for self
sig # the signal name
args # arguments to the signal (typically a Hash)
Call does the following:
object = app.get(obj) # lookup an application object by obj
signal = object.signal(sig) # lookup a signal by sig
signal.call(args) # call the signal with args
Call returns the result of the signal call.
325 326 327 328 329 330 331 |
# File 'lib/tap/app.rb', line 325 def call(args, &block) obj = args['obj'] sig = args['sig'] args = args['args'] || args route(obj, sig, &block).call(args) end |
#check_terminate ⇒ Object
Raises a TerminateError if state is TERMINATE. Nodes should call check_terminate to provide breakpoints in long-running processes.
A block may be provided to check_terminate to execute code before raising the TerminateError.
610 611 612 613 614 615 |
# File 'lib/tap/app.rb', line 610 def check_terminate if state == State::TERMINATE yield() if block_given? raise TerminateError.new end end |
#debug? ⇒ Boolean
True if the debug config or the global variable $DEBUG is true.
191 192 193 |
# File 'lib/tap/app.rb', line 191 def debug? debug || $DEBUG end |
#dispatch(node, inputs = []) ⇒ Object
Dispatch does the following in order:
-
call stack with the node and inputs
-
call the node joins (node.joins)
The joins for self will be called if the node joins are an empty array. No joins will be called if the node joins are nil, or if the node does not provide a joins method.
Dispatch returns the stack result.
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/tap/app.rb', line 524 def dispatch(node, inputs=[]) result = stack.call(node, inputs) if node.respond_to?(:joins) if joins = node.joins if joins.empty? joins = self.joins end joins.each do |join| join.call(result) end end end result end |
#dump(target = $stdout, options = {}) ⇒ Object
Dumps self to the target as YAML. (note dump is still experimental)
Notes
Platforms that use Syck (ex MRI) require a fix because Syck misformats certain dumps, such that they cannot be reloaded (even by Syck). Specifically:
&id001 !ruby/object:Tap::Task ?
should be:
? &id001 !ruby/object:Tap::Task
Dump fixes this error and, in addition, removes Thread and Proc dumps because they can’t be allocated on load.
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 |
# File 'lib/tap/app.rb', line 641 def dump(target=$stdout, ={}) synchronize do = { :date_format => '%Y-%m-%d %H:%M:%S', :date => true, :info => true }.merge() # print basic headers target.puts "# date: #{Time.now.strftime([:date_format])}" if [:date] target.puts "# info: #{info}" if [:info] # dump yaml, fixing as necessary yaml = YAML.dump(self) yaml.gsub!(/\&(.*!ruby\/object:.*?)\s*\?/) {"? &#{$1} " } if YAML.const_defined?(:Syck) yaml.gsub!(/!ruby\/object:(Thread|Proc) \{\}/, '') target << yaml end target end |
#enq(node, *inputs) ⇒ Object
Enques the node with the inputs. Returns the node.
230 231 232 233 |
# File 'lib/tap/app.rb', line 230 def enq(node, *inputs) queue.enq(node, inputs) node end |
#enque(var, *args) ⇒ Object
Enques the application object specified by var with args. Raises an error if no such application object exists.
455 456 457 458 459 460 461 462 |
# File 'lib/tap/app.rb', line 455 def enque(var, *args) unless node = get(var) raise "unknown object: #{var.inspect}" end queue.enq(node, args) node end |
#env=(env) ⇒ Object
Sets the application environment and validates that env provides an AGET ([]) and invert method. AGET is used to lookup constants during init; it receives the ‘class’ parameter and should return a corresponding class. Invert should return an object that reverses the AGET lookup. Tap::Env and a regular Hash both satisfy this api.
Env can be set to nil and is set to nil by default, but initialization is constrained without it.
178 179 180 181 |
# File 'lib/tap/app.rb', line 178 def env=(env) Validation.validate_api(env, [:[], :invert]) unless env.nil? @env = env end |
#execute(node, *inputs) ⇒ Object
Execute is a wrapper for dispatch allowing inputs to be listed out rather than provided as an array.
510 511 512 |
# File 'lib/tap/app.rb', line 510 def execute(node, *inputs) dispatch(node, inputs) end |
#gc(all = false) ⇒ Object
Removes objects keyed by integers. If all is specified, gc will clear all objects.
300 301 302 303 304 305 306 307 308 |
# File 'lib/tap/app.rb', line 300 def gc(all=false) if all objects.clear else objects.delete_if {|var, obj| var.kind_of?(Integer) } end self end |
#get(var) ⇒ Object
Returns the object set to var, or self if var is nil.
266 267 268 |
# File 'lib/tap/app.rb', line 266 def get(var) var.nil? ? self : objects[var] end |
#info ⇒ Object
621 622 623 |
# File 'lib/tap/app.rb', line 621 def info "state: #{state} (#{State.state_str(state)}) queue: #{queue.size}" end |
#inspect ⇒ Object
770 771 772 |
# File 'lib/tap/app.rb', line 770 def inspect "#<#{self.class}:#{object_id} #{info}>" end |
#join(inputs, outputs, config = {}, klass = Join, &block) ⇒ Object
Generates a join between the inputs and outputs.
7 8 9 |
# File 'lib/tap/join.rb', line 7 def join(inputs, outputs, config={}, klass=Join, &block) klass.new(config, self).join(inputs, outputs, &block) end |
#log(action, msg = nil, level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO). The message may be generated by a block; in that case leave the message unspecified as nil.
Logging is suppressed if quiet is true.
Performance Considerations
Using a block to generate a message is quicker if logging is off, but slower when logging is on. However, when messages use a lot of interpolation the log time is dominated by the interpolation; at some point the penalty for using a block is outweighed by the benefit of being able to skip the interpolation.
For example:
log(:action, "this is fast")
log(:action) { "and there's not much benefit to the block" }
log(:action, "but a message with #{a}, #{b}, #{c}, and #{d}")
log(:action) { "may be #{best} in a block because you can #{turn} #{it} #{off}" }
217 218 219 220 221 222 |
# File 'lib/tap/app.rb', line 217 def log(action, msg=nil, level=Logger::INFO) if !quiet || verbose msg ||= yield logger.add(level, msg, action.to_s) end end |
#middleware ⇒ Object
Returns an array of middlware in use by self.
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 |
# File 'lib/tap/app.rb', line 465 def middleware middleware = [] # collect middleware by walking up the stack synchronize do current = stack visited = [current] while current.respond_to?(:stack) middleware << current current = current.stack circular_stack = visited.include?(current) visited << current if circular_stack visited.collect! {|middleware| middleware.class.to_s }.join(', ') raise "circular stack detected:\n[#{visited}]" end end end middleware end |
#node(&block) ⇒ Object
Returns a new node that executes block on call.
225 226 227 |
# File 'lib/tap/app.rb', line 225 def node(&block) # :yields: *inputs Node.intern(&block) end |
#obj(var) ⇒ Object
Same as get, but raises an error if no object is set to the variable.
271 272 273 |
# File 'lib/tap/app.rb', line 271 def obj(var) get(var) or raise "no object set to: #{var.inspect}" end |
#parse(argv, &block) ⇒ Object
:yields: spec
392 393 394 |
# File 'lib/tap/app.rb', line 392 def parse(argv, &block) # :yields: spec parse!(argv.dup, &block) end |
#parse!(argv, &block) ⇒ Object
:yields: spec
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 |
# File 'lib/tap/app.rb', line 396 def parse!(argv, &block) # :yields: spec parser = Parser.new argv = parser.parse!(argv) # The queue API does not provide a delete method, so picking out the # deque jobs requires the whole queue be cleared, then re-enqued. # Safety (and speed) is improved with synchronization. queue.synchronize do deque = [] blocks = {} if auto_enque blocks[:node] = lambda do |obj, args| queue.enq(obj, args) args = nil end blocks[:join] = lambda do |obj, args| unless obj.respond_to?(:outputs) # warning end deque.concat obj.outputs end end parser.specs.each do |spec| if block_given? next unless yield(spec) end type, obj, sig, *args = spec sig_block = case sig when 'set' blocks[type] when 'parse' block else nil end call('obj' => obj, 'sig' => sig, 'args' => args, &sig_block) end deque.uniq! queue.clear.each do |(obj, args)| if deque.delete(obj) warn_ignored_args(args) else queue.enq(obj, args) end end end argv end |
#reset ⇒ Object
Clears objects, the queue, and resets the stack so that no middleware is used. Reset raises an error unless state is READY.
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 |
# File 'lib/tap/app.rb', line 492 def reset synchronize do unless state == State::READY raise "cannot reset unless READY" end # walk up middleware to find the base of the stack while @stack.respond_to?(:stack) @stack = @stack.stack end objects.clear queue.clear end end |
#resolve(const_str) ⇒ Object
345 346 347 348 |
# File 'lib/tap/app.rb', line 345 def resolve(const_str) constant = env ? env[const_str] : Env::Constant.constantize(const_str) constant or raise "unresolvable constant: #{const_str.inspect}" end |
#route(obj, sig, &block) ⇒ Object
333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/tap/app.rb', line 333 def route(obj, sig, &block) unless object = get(obj) raise "unknown object: #{obj.inspect}" end unless object.respond_to?(:signal) raise "cannot signal: #{object.inspect}" end object.signal(sig, &block) end |
#run ⇒ Object
Sequentially dispatches each enqued (node, inputs) pair to the application stack. A run continues until the queue is empty.
Run checks the state of self before dispatching a node. If the state changes from RUN, the following behaviors result:
STOP No more nodes will be dispatched; the current node
will continute to completion.
TERMINATE No more nodes will be dispatched and the currently
running node will be discontinued as described in
terminate.
Calls to run when the state is not READY do nothing and return immediately.
Returns self.
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 |
# File 'lib/tap/app.rb', line 559 def run synchronize do return self unless state == State::READY @state = State::RUN end begin while state == State::RUN break unless entry = queue.deq dispatch(*entry) end rescue(TerminateError) # gracefully fail for termination errors queue.unshift(*entry) ensure synchronize { @state = State::READY } end self end |
#serialize(bare = true) ⇒ Object
Converts the self to a schema that can be used to build a new app with equivalent application objects, queue, and middleware. Schema are a collection of signal hashes such that this will rebuild the state of a on b:
a, b = App.new, App.new
a.to_schema.each {|spec| b.call(spec) }
Application objects that do not satisfy the application object API are quietly ignored; enable debugging to be warned of their existance.
674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 |
# File 'lib/tap/app.rb', line 674 def serialize(=true) # setup variables specs = {} order = [] # collect enque signals to setup queue signals = queue.to_a.collect do |(node, args)| {'sig' => 'enque', 'args' => [var(node)] + args} end # collect and trace application objects objects.keys.sort_by do |var| var.to_s end.each do |var| obj = objects[var] order.concat trace(obj, specs) end middleware.each do |obj| order.concat trace(obj, specs) end if order.delete(self) specs.delete(self) else order.unshift(self) trace(self, specs) end order.uniq! # assemble specs variables = {} objects.each_pair do |var, obj| (variables[obj] ||= []) << var end invert_env = env ? env.invert : nil specs.keys.each do |obj| spec = {'sig' => 'set'} # assign variables if vars = variables[obj] if vars.length == 1 spec['var'] = vars[0] else spec['var'] = vars end end # assign the class klass = obj.class klass = invert_env[klass] if invert_env spec['class'] = klass.to_s # merge obj_spec if possible obj_spec = specs[obj] if (obj_spec.keys & RESERVED_KEYS).empty? spec.merge!(obj_spec) else spec['spec'] = obj_spec end specs[obj] = spec end middleware.each do |obj| spec = specs[obj] spec['sig'] = 'use' end order.collect! {|obj| specs[obj] }.concat(signals) end |
#set(var, obj) ⇒ Object
Sets the object to the specified variable and returns obj. Provide nil as obj to un-set a variable (in which case the existing object is returned).
Nil is reserved as a variable name and cannot be used by set.
255 256 257 258 259 260 261 262 263 |
# File 'lib/tap/app.rb', line 255 def set(var, obj) raise "no var specified" if var.nil? if obj objects[var] = obj else objects.delete(var) end end |
#stop ⇒ Object
Signals a running app to stop dispatching nodes to the application stack by setting state to STOP. The node currently in the stack will continue to completion.
Does nothing unless state is RUN.
585 586 587 588 |
# File 'lib/tap/app.rb', line 585 def stop synchronize { @state = State::STOP if state == State::RUN } self end |
#task(config = {}, klass = Task, &block) ⇒ Object
Generates a task with the specified config, initialized to self.
A block may be provided to overrride the process method; it will be called with the task instance, plus any inputs.
no_inputs = app.task {|task| [] }
one_input = app.task {|task, input| [input] }
mixed_inputs = app.task {|task, a, b, *args| [a, b, args] }
no_inputs.execute # => []
one_input.execute(:a) # => [:a]
mixed_inputs.execute(:a, :b) # => [:a, :b, []]
mixed_inputs.execute(:a, :b, 1, 2, 3) # => [:a, :b, [1,2,3]]
20 21 22 23 24 25 26 27 |
# File 'lib/tap/task.rb', line 20 def task(config={}, klass=Task, &block) instance = klass.new(config, self) if block_given? instance.extend Intern instance.process_block = block end instance end |
#terminate ⇒ Object
Signals a running application to terminate execution by setting state to TERMINATE. In this state, calls to check_terminate will raise a TerminateError. Run considers TerminateErrors a normal exit and rescues them quietly.
Nodes can set breakpoints that call check_terminate to invoke node-specific termination. If a node never calls check_terminate, then it will continue to completion.
Does nothing if state is READY.
600 601 602 603 |
# File 'lib/tap/app.rb', line 600 def terminate synchronize { @state = State::TERMINATE unless state == State::READY } self end |
#to_spec ⇒ Object
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 |
# File 'lib/tap/app.rb', line 748 def to_spec signals = serialize(false) spec = signals.shift spec.delete('self') spec.delete('sig') var = spec.delete('var') klass = spec.delete('class') spec = spec.delete('spec') || spec signals.unshift( 'sig' => 'set', 'var' => var, 'class' => klass, 'self' => true ) if var spec['signals'] = signals spec end |
#use(middleware, *argv) ⇒ Object
Adds the specified middleware to the stack. The argv will be used as extra arguments to initialize the middleware.
244 245 246 247 248 |
# File 'lib/tap/app.rb', line 244 def use(middleware, *argv) synchronize do @stack = middleware.new(@stack, *argv) end end |
#var(obj, auto_assign = true) ⇒ Object
Returns the variable for the object. If the object is not assigned to a variable and auto_assign is true, then the object is set to an unused variable and the new variable is returned.
The new variable will be an integer and will be removed upon gc.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/tap/app.rb', line 280 def var(obj, auto_assign=true) objects.each_pair do |var, object| return var if obj == object end return nil unless auto_assign var = objects.length loop do if objects.has_key?(var) var += 1 else set(var, obj) return var end end end |