Class: Tap::App

Inherits:
Object
  • Object
show all
Includes:
Configurable, MonitorMixin, Node, Signals
Defined in:
lib/tap/app.rb,
lib/tap/join.rb,
lib/tap/task.rb,
lib/tap/app/api.rb,
lib/tap/app/node.rb,
lib/tap/app/queue.rb,
lib/tap/app/stack.rb,
lib/tap/app/state.rb

Overview

:startdoc::app

App coordinates the setup and execution of workflows.

Defined Under Namespace

Modules: Node, State Classes: Api, Queue, Stack, TerminateError

Constant Summary collapse

CALL_KEYS =

The reserved call keys

%w{obj sig args}
INIT_KEYS =

The reserved init keys

%w{var class spec}
RESERVED_KEYS =

Reserved call and init keys as a single array

CALL_KEYS + INIT_KEYS
DEFAULT_LOGGER =

The default App logger (writes to $stderr at level INFO)

Logger.new($stderr)

Class Attribute Summary collapse

Instance Attribute Summary collapse

Attributes included from Node

#joins

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Node

extended, intern, #on_complete

Methods included from Signals

#signal, #signal?

Methods included from Signals::ModuleMethods

included

Constructor Details

#initialize(config = {}, options = {}, &block) ⇒ App

Creates a new App with the given configuration. Options can be used to specify objects that are normally initialized for every new app:

:stack      the application stack; an App::Stack
:queue      the application queue; an App::Queue
:objects    application objects; a hash of (var, object) pairs
:logger     the application logger

A block may also be provided; it will be set as a default join.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/tap/app.rb', line 155

def initialize(config={}, options={}, &block)
  super() # monitor
  
  @state = State::READY
  @stack = options[:stack] || Stack.new(self)
  @queue = options[:queue] || Queue.new
  @objects = options[:objects] || {}
  @logger = options[:logger] || DEFAULT_LOGGER
  @joins = []
  on_complete(&block)
  
  self.env = config.delete(:env) || config.delete('env')
  initialize_config(config)
end

Class Attribute Details

.instance(auto_initialize = true) ⇒ Object

Returns the current instance of App. If no instance has been set, then instance initializes a new App with the default configuration. Instance is used to initialize tasks when no app is specified and exists for convenience only.



24
25
26
# File 'lib/tap/app.rb', line 24

def instance(auto_initialize=true)
  @instance ||= (auto_initialize ? new : nil)
end

Instance Attribute Details

#loggerObject

The application logger



89
90
91
# File 'lib/tap/app.rb', line 89

def logger
  @logger
end

#objectsObject (readonly)

A cache of application objects



86
87
88
# File 'lib/tap/app.rb', line 86

def objects
  @objects
end

#queueObject (readonly)

The application queue



83
84
85
# File 'lib/tap/app.rb', line 83

def queue
  @queue
end

#stackObject

The application call stack for executing nodes



80
81
82
# File 'lib/tap/app.rb', line 80

def stack
  @stack
end

#stateObject (readonly)

The state of the application (see App::State)



77
78
79
# File 'lib/tap/app.rb', line 77

def state
  @state
end

Class Method Details

.build(spec = {}, app = nil) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/tap/app.rb', line 36

def build(spec={}, app=nil)
  config = spec['config'] || {}
  signals = spec['signals'] || []
  
  if spec['self']
    app.reconfigure(config)
  else
    app = new(config)
  end
  
  signals.each do |args|
    app.call(args)
  end
  
  app.gc
  app
end

.setup(dir = Dir.pwd) ⇒ Object

Sets up and returns App.instance with an Env setup to the specified directory. This method is used to initialize the app and env as seen by the tap executable.



31
32
33
34
# File 'lib/tap/app.rb', line 31

def setup(dir=Dir.pwd)
  env = Env.setup(dir)
  @instance = new(:env => env)
end

Instance Method Details

#bq(*inputs, &block) ⇒ Object

Generates a node from the block and enques. Returns the new node.



236
237
238
239
240
# File 'lib/tap/app.rb', line 236

def bq(*inputs, &block) # :yields: *inputs
  node = self.node(&block)
  queue.enq(node, inputs)
  node
end

#build(spec) ⇒ Object



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/tap/app.rb', line 350

def build(spec)
  var = spec['var']
  clas = spec['class']
  spec = spec['spec'] || spec
  obj = nil
  
  if clas.nil?
    unless spec.empty?
      raise "no class specified"
    end
  else
    clas = resolve(clas)
    
    case spec
    when Array
      parse = bang ? :parse! : :parse
      obj, args = clas.send(parse, spec, self)
  
      if block_given?
        yield(obj, args)
      else
        warn_ignored_args(args)
      end
    
    when Hash
      obj = clas.build(spec, self)
    else
      raise "invalid spec: #{spec.inspect}"
    end
  end
  
  unless var.nil?
    if var.respond_to?(:each)
      var.each {|v| set(v, obj) }
    else
      set(var, obj)
    end
  end
  
  obj
end

#call(args, &block) ⇒ Object

Sends a signal to an application object. The input should be a hash defining these fields:

obj      # a variable identifying an object, or nil for self
sig      # the signal name
args     # arguments to the signal (typically a Hash)

Call does the following:

object = app.get(obj)        # lookup an application object by obj
signal = object.signal(sig)  # lookup a signal by sig
signal.call(args)            # call the signal with args

Call returns the result of the signal call.



325
326
327
328
329
330
331
# File 'lib/tap/app.rb', line 325

def call(args, &block)
  obj = args['obj']
  sig = args['sig']
  args = args['args'] || args
  
  route(obj, sig, &block).call(args)
end

#check_terminateObject

Raises a TerminateError if state is TERMINATE. Nodes should call check_terminate to provide breakpoints in long-running processes.

A block may be provided to check_terminate to execute code before raising the TerminateError.



610
611
612
613
614
615
# File 'lib/tap/app.rb', line 610

def check_terminate
  if state == State::TERMINATE
    yield() if block_given?
    raise TerminateError.new
  end
end

#debug?Boolean

True if the debug config or the global variable $DEBUG is true.

Returns:

  • (Boolean)


191
192
193
# File 'lib/tap/app.rb', line 191

def debug?
  debug || $DEBUG
end

#dispatch(node, inputs = []) ⇒ Object

Dispatch does the following in order:

  • call stack with the node and inputs

  • call the node joins (node.joins)

The joins for self will be called if the node joins are an empty array. No joins will be called if the node joins are nil, or if the node does not provide a joins method.

Dispatch returns the stack result.



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/tap/app.rb', line 524

def dispatch(node, inputs=[])
  result = stack.call(node, inputs)
  
  if node.respond_to?(:joins)
    if joins = node.joins

      if joins.empty?
        joins = self.joins
      end
    
      joins.each do |join|
        join.call(result)
      end
    end
  end
  
  result
end

#dump(target = $stdout, options = {}) ⇒ Object

Dumps self to the target as YAML. (note dump is still experimental)

Notes

Platforms that use Syck (ex MRI) require a fix because Syck misformats certain dumps, such that they cannot be reloaded (even by Syck). Specifically:

&id001 !ruby/object:Tap::Task ?

should be:

? &id001 !ruby/object:Tap::Task

Dump fixes this error and, in addition, removes Thread and Proc dumps because they can’t be allocated on load.



641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
# File 'lib/tap/app.rb', line 641

def dump(target=$stdout, options={})
  synchronize do
    options = {
      :date_format => '%Y-%m-%d %H:%M:%S',
      :date => true,
      :info => true
    }.merge(options)
    
    # print basic headers
    target.puts "# date: #{Time.now.strftime(options[:date_format])}" if options[:date]
    target.puts "# info: #{info}" if options[:info]
    
    # dump yaml, fixing as necessary
    yaml = YAML.dump(self)
    yaml.gsub!(/\&(.*!ruby\/object:.*?)\s*\?/) {"? &#{$1} " } if YAML.const_defined?(:Syck)
    yaml.gsub!(/!ruby\/object:(Thread|Proc) \{\}/, '')
    target << yaml
  end
  
  target
end

#enq(node, *inputs) ⇒ Object

Enques the node with the inputs. Returns the node.



230
231
232
233
# File 'lib/tap/app.rb', line 230

def enq(node, *inputs)
  queue.enq(node, inputs)
  node
end

#enque(var, *args) ⇒ Object

Enques the application object specified by var with args. Raises an error if no such application object exists.



455
456
457
458
459
460
461
462
# File 'lib/tap/app.rb', line 455

def enque(var, *args)
  unless node = get(var)
    raise "unknown object: #{var.inspect}"
  end
  
  queue.enq(node, args)
  node
end

#env=(env) ⇒ Object

Sets the application environment and validates that env provides an AGET ([]) and invert method. AGET is used to lookup constants during init; it receives the ‘class’ parameter and should return a corresponding class. Invert should return an object that reverses the AGET lookup. Tap::Env and a regular Hash both satisfy this api.

Env can be set to nil and is set to nil by default, but initialization is constrained without it.



178
179
180
181
# File 'lib/tap/app.rb', line 178

def env=(env)
  Validation.validate_api(env, [:[], :invert]) unless env.nil?
  @env = env
end

#execute(node, *inputs) ⇒ Object

Execute is a wrapper for dispatch allowing inputs to be listed out rather than provided as an array.



510
511
512
# File 'lib/tap/app.rb', line 510

def execute(node, *inputs)
  dispatch(node, inputs)
end

#gc(all = false) ⇒ Object

Removes objects keyed by integers. If all is specified, gc will clear all objects.



300
301
302
303
304
305
306
307
308
# File 'lib/tap/app.rb', line 300

def gc(all=false)
  if all
    objects.clear
  else
    objects.delete_if {|var, obj| var.kind_of?(Integer) }
  end
  
  self
end

#get(var) ⇒ Object

Returns the object set to var, or self if var is nil.



266
267
268
# File 'lib/tap/app.rb', line 266

def get(var)
  var.nil? ? self : objects[var]
end

#infoObject

Returns an information string for the App.

App.new.info   # => 'state: 0 (READY) queue: 0'


621
622
623
# File 'lib/tap/app.rb', line 621

def info
  "state: #{state} (#{State.state_str(state)}) queue: #{queue.size}"
end

#inspectObject



770
771
772
# File 'lib/tap/app.rb', line 770

def inspect
  "#<#{self.class}:#{object_id} #{info}>"
end

#join(inputs, outputs, config = {}, klass = Join, &block) ⇒ Object

Generates a join between the inputs and outputs.



7
8
9
# File 'lib/tap/join.rb', line 7

def join(inputs, outputs, config={}, klass=Join, &block)
  klass.new(config, self).join(inputs, outputs, &block)
end

#log(action, msg = nil, level = Logger::INFO) ⇒ Object

Logs the action and message at the input level (default INFO). The message may be generated by a block; in that case leave the message unspecified as nil.

Logging is suppressed if quiet is true.

Performance Considerations

Using a block to generate a message is quicker if logging is off, but slower when logging is on. However, when messages use a lot of interpolation the log time is dominated by the interpolation; at some point the penalty for using a block is outweighed by the benefit of being able to skip the interpolation.

For example:

log(:action, "this is fast")
log(:action) { "and there's not much benefit to the block" }

log(:action, "but a message with #{a}, #{b}, #{c}, and #{d}")
log(:action) { "may be #{best} in a block because you can #{turn} #{it} #{off}" }


217
218
219
220
221
222
# File 'lib/tap/app.rb', line 217

def log(action, msg=nil, level=Logger::INFO)
  if !quiet || verbose
    msg ||= yield
    logger.add(level, msg, action.to_s)
  end
end

#middlewareObject

Returns an array of middlware in use by self.



465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/tap/app.rb', line 465

def middleware
  middleware = []
  
  # collect middleware by walking up the stack
  synchronize do
    current = stack
    visited = [current]
    
    while current.respond_to?(:stack)
      middleware << current
      current = current.stack
      
      circular_stack = visited.include?(current)
      visited << current
      
      if circular_stack
        visited.collect! {|middleware| middleware.class.to_s }.join(', ')
        raise "circular stack detected:\n[#{visited}]"
      end
    end
  end
  
  middleware
end

#node(&block) ⇒ Object

Returns a new node that executes block on call.



225
226
227
# File 'lib/tap/app.rb', line 225

def node(&block) # :yields: *inputs
  Node.intern(&block)
end

#obj(var) ⇒ Object

Same as get, but raises an error if no object is set to the variable.



271
272
273
# File 'lib/tap/app.rb', line 271

def obj(var)
  get(var) or raise "no object set to: #{var.inspect}"
end

#parse(argv, &block) ⇒ Object

:yields: spec



392
393
394
# File 'lib/tap/app.rb', line 392

def parse(argv, &block) # :yields: spec
  parse!(argv.dup, &block)
end

#parse!(argv, &block) ⇒ Object

:yields: spec



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/tap/app.rb', line 396

def parse!(argv, &block) # :yields: spec
  parser = Parser.new
  argv = parser.parse!(argv)
  
  # The queue API does not provide a delete method, so picking out the
  # deque jobs requires the whole queue be cleared, then re-enqued.
  # Safety (and speed) is improved with synchronization.
  queue.synchronize do
    deque = []
    blocks = {}
    
    if auto_enque
      blocks[:node] = lambda do |obj, args|
        queue.enq(obj, args)
        args = nil
      end
      
      blocks[:join] = lambda do |obj, args|
        unless obj.respond_to?(:outputs)
          # warning
        end
        deque.concat obj.outputs
      end
    end
    
    parser.specs.each do |spec|
      if block_given?
        next unless yield(spec)
      end
      
      type, obj, sig, *args = spec
      
      sig_block = case sig
      when 'set'
        blocks[type]
      when 'parse'
        block
      else
        nil
      end
      
      call('obj' => obj, 'sig' => sig, 'args' => args, &sig_block)  
    end
    
    deque.uniq!
    queue.clear.each do |(obj, args)|
      if deque.delete(obj)
        warn_ignored_args(args)
      else
        queue.enq(obj, args)
      end
    end
  end

  argv
end

#resetObject

Clears objects, the queue, and resets the stack so that no middleware is used. Reset raises an error unless state is READY.



492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/tap/app.rb', line 492

def reset
  synchronize do
    unless state == State::READY
      raise "cannot reset unless READY"
    end
    
    # walk up middleware to find the base of the stack
    while @stack.respond_to?(:stack)
      @stack = @stack.stack
    end
    
    objects.clear
    queue.clear
  end
end

#resolve(const_str) ⇒ Object



345
346
347
348
# File 'lib/tap/app.rb', line 345

def resolve(const_str)
  constant = env ? env[const_str] : Env::Constant.constantize(const_str)
  constant or raise "unresolvable constant: #{const_str.inspect}"
end

#route(obj, sig, &block) ⇒ Object



333
334
335
336
337
338
339
340
341
342
343
# File 'lib/tap/app.rb', line 333

def route(obj, sig, &block)
  unless object = get(obj)
    raise "unknown object: #{obj.inspect}"
  end
  
  unless object.respond_to?(:signal)
    raise "cannot signal: #{object.inspect}"
  end
  
  object.signal(sig, &block)
end

#runObject

Sequentially dispatches each enqued (node, inputs) pair to the application stack. A run continues until the queue is empty.

Run checks the state of self before dispatching a node. If the state changes from RUN, the following behaviors result:

STOP        No more nodes will be dispatched; the current node
            will continute to completion.
TERMINATE   No more nodes will be dispatched and the currently
            running node will be discontinued as described in
            terminate.

Calls to run when the state is not READY do nothing and return immediately.

Returns self.



559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
# File 'lib/tap/app.rb', line 559

def run
  synchronize do
    return self unless state == State::READY
    @state = State::RUN
  end
  
  begin
    while state == State::RUN
      break unless entry = queue.deq
      dispatch(*entry)
    end
  rescue(TerminateError)
    # gracefully fail for termination errors
    queue.unshift(*entry)
  ensure
    synchronize { @state = State::READY }
  end
  
  self
end

#serialize(bare = true) ⇒ Object

Converts the self to a schema that can be used to build a new app with equivalent application objects, queue, and middleware. Schema are a collection of signal hashes such that this will rebuild the state of a on b:

a, b = App.new, App.new
a.to_schema.each {|spec| b.call(spec) }

Application objects that do not satisfy the application object API are quietly ignored; enable debugging to be warned of their existance.



674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
# File 'lib/tap/app.rb', line 674

def serialize(bare=true)
  # setup variables
  specs = {}
  order = []
  
  # collect enque signals to setup queue
  signals = queue.to_a.collect do |(node, args)|
    {'sig' => 'enque', 'args' => [var(node)] + args}
  end
  
  # collect and trace application objects
  objects.keys.sort_by do |var|
    var.to_s
  end.each do |var|
    obj = objects[var]
    order.concat trace(obj, specs)
  end
  
  middleware.each do |obj|
    order.concat trace(obj, specs)
  end
  
  if bare
    order.delete(self)
    specs.delete(self)
  else
    order.unshift(self)
    trace(self, specs)
  end
  order.uniq!
  
  # assemble specs
  variables = {}
  objects.each_pair do |var, obj|
    (variables[obj] ||= []) << var
  end
  
  invert_env = env ? env.invert : nil
  specs.keys.each do |obj|
    spec = {'sig' => 'set'}
    
    # assign variables
    if vars = variables[obj]
      if vars.length == 1
        spec['var'] = vars[0]
      else
        spec['var'] = vars
      end
    end

    # assign the class
    klass = obj.class
    klass = invert_env[klass] if invert_env
    spec['class'] = klass.to_s

    # merge obj_spec if possible
    obj_spec = specs[obj]
    if (obj_spec.keys & RESERVED_KEYS).empty?
      spec.merge!(obj_spec)
    else
      spec['spec'] = obj_spec
    end
    
    specs[obj] = spec
  end
  
  middleware.each do |obj|
    spec = specs[obj]
    spec['sig'] = 'use'
  end
  
  order.collect! {|obj| specs[obj] }.concat(signals)
end

#set(var, obj) ⇒ Object

Sets the object to the specified variable and returns obj. Provide nil as obj to un-set a variable (in which case the existing object is returned).

Nil is reserved as a variable name and cannot be used by set.



255
256
257
258
259
260
261
262
263
# File 'lib/tap/app.rb', line 255

def set(var, obj)
  raise "no var specified" if var.nil?
  
  if obj
    objects[var] = obj
  else
    objects.delete(var)
  end
end

#stopObject

Signals a running app to stop dispatching nodes to the application stack by setting state to STOP. The node currently in the stack will continue to completion.

Does nothing unless state is RUN.



585
586
587
588
# File 'lib/tap/app.rb', line 585

def stop
  synchronize { @state = State::STOP if state == State::RUN }
  self
end

#task(config = {}, klass = Task, &block) ⇒ Object

Generates a task with the specified config, initialized to self.

A block may be provided to overrride the process method; it will be called with the task instance, plus any inputs.

no_inputs = app.task {|task| [] }
one_input = app.task {|task, input| [input] }
mixed_inputs = app.task {|task, a, b, *args| [a, b, args] }

no_inputs.execute                            # => []
one_input.execute(:a)                        # => [:a]
mixed_inputs.execute(:a, :b)                 # => [:a, :b, []]
mixed_inputs.execute(:a, :b, 1, 2, 3)        # => [:a, :b, [1,2,3]]


20
21
22
23
24
25
26
27
# File 'lib/tap/task.rb', line 20

def task(config={}, klass=Task, &block)
  instance = klass.new(config, self)
  if block_given?
    instance.extend Intern
    instance.process_block = block
  end
  instance
end

#terminateObject

Signals a running application to terminate execution by setting state to TERMINATE. In this state, calls to check_terminate will raise a TerminateError. Run considers TerminateErrors a normal exit and rescues them quietly.

Nodes can set breakpoints that call check_terminate to invoke node-specific termination. If a node never calls check_terminate, then it will continue to completion.

Does nothing if state is READY.



600
601
602
603
# File 'lib/tap/app.rb', line 600

def terminate
  synchronize { @state = State::TERMINATE unless state == State::READY }
  self
end

#to_specObject



748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
# File 'lib/tap/app.rb', line 748

def to_spec
  signals = serialize(false)
  spec = signals.shift
  
  spec.delete('self')
  spec.delete('sig')
  
  var = spec.delete('var')
  klass = spec.delete('class')
  spec = spec.delete('spec') || spec
  
  signals.unshift(
    'sig' => 'set',
    'var' => var, 
    'class' => klass, 
    'self' => true
  ) if var
  spec['signals'] = signals
  
  spec
end

#use(middleware, *argv) ⇒ Object

Adds the specified middleware to the stack. The argv will be used as extra arguments to initialize the middleware.



244
245
246
247
248
# File 'lib/tap/app.rb', line 244

def use(middleware, *argv)
  synchronize do
    @stack = middleware.new(@stack, *argv)
  end
end

#var(obj, auto_assign = true) ⇒ Object

Returns the variable for the object. If the object is not assigned to a variable and auto_assign is true, then the object is set to an unused variable and the new variable is returned.

The new variable will be an integer and will be removed upon gc.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/tap/app.rb', line 280

def var(obj, auto_assign=true)
  objects.each_pair do |var, object|
    return var if obj == object
  end
  
  return nil unless auto_assign
  
  var = objects.length
  loop do 
    if objects.has_key?(var)
      var += 1
    else
      set(var, obj)
      return var
    end
  end
end