Module: Tap::Support::Executable

Included in:
Task
Defined in:
lib/tap/support/executable.rb

Overview

Executable wraps objects to make them executable by App.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#appObject (readonly)

The App receiving self during enq



10
11
12
# File 'lib/tap/support/executable.rb', line 10

def app
  @app
end

#batchObject (readonly)

The batch for self



22
23
24
# File 'lib/tap/support/executable.rb', line 22

def batch
  @batch
end

#dependenciesObject (readonly)

An array of dependency indicies that will be resolved on _execute



19
20
21
# File 'lib/tap/support/executable.rb', line 19

def dependencies
  @dependencies
end

#method_nameObject (readonly)

The method called during _execute



13
14
15
# File 'lib/tap/support/executable.rb', line 13

def method_name
  @method_name
end

#on_complete_blockObject (readonly)

The block called when _execute completes



16
17
18
# File 'lib/tap/support/executable.rb', line 16

def on_complete_block
  @on_complete_block
end

Class Method Details

.initialize(obj, method_name, app = App.instance, batch = [], dependencies = [], &on_complete_block) ⇒ Object

Extends obj with Executable and sets up all required variables. The specified method will be called on _execute.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/tap/support/executable.rb', line 28

def self.initialize(obj, method_name, app=App.instance, batch=[], dependencies=[], &on_complete_block)
  obj.extend Executable
  obj.instance_variable_set(:@app, app)
  obj.instance_variable_set(:@method_name, method_name)
  obj.instance_variable_set(:@on_complete_block, on_complete_block)
  obj.instance_variable_set(:@dependencies, dependencies)
  obj.instance_variable_set(:@batch, batch)
  batch << obj
  
  obj
end

Instance Method Details

#_execute(*inputs) ⇒ Object

Auditing method call. Resolves dependencies, executes method_name, and sends the audited result to the on_complete_block (if set).

Returns the audited result.



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/tap/support/executable.rb', line 244

def _execute(*inputs)
  resolve_dependencies
  
  previous = []
  inputs.collect! do |input| 
    if input.kind_of?(Audit) 
      previous << input
      input.value
    else
      previous << Audit.new(nil, input)
      input
    end
  end
   
  audit = Audit.new(self, send(method_name, *inputs), previous)
  on_complete_block ? on_complete_block.call(audit) : app.aggregator.store(audit)
  
  audit
end

#batch_indexObject

Returns the index of self in batch.



63
64
65
# File 'lib/tap/support/executable.rb', line 63

def batch_index
  batch.index(self)
end

#batch_with(*executables) ⇒ Object

Merges the batches for self and the specified Executables, removing duplicates.

class BatchExecutable
  include Tap::Support::Executable
  def initialize(batch=[])
    @batch = batch
    batch << self
  end
end

b1 = BatchExecutable.new
b2 = BatchExecutable.new
b3 = BatchExecutable.new

b1.batch_with(b2, b3)
b1.batch                   # => [b1, b2, b3]
b3.batch                   # => [b1, b2, b3]

Note that batch_with is not recursive (ie it does not merge the batches of each member in the batch):

b4 = BatchExecutable.new
b4.batch_with(b3)   

b4.batch                   # => [b4, b1, b2, b3]
b3.batch                   # => [b4, b1, b2, b3]
b2.batch                   # => [b1, b2, b3]
b1.batch                   # => [b1, b2, b3]

However it does affect all objects that share the same underlying batch:

b5 = BatchExecutable.new(b1.batch)
b6 = BatchExecutable.new

b5.batch.object_id         # => b1.batch.object_id
b5.batch                   # => [b1, b2, b3, b5]

b5.batch_with(b6)

b5.batch                   # => [b1, b2, b3, b5, b6]
b1.batch                   # => [b1, b2, b3, b5, b6]

Returns self.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/tap/support/executable.rb', line 112

def batch_with(*executables)
  batches = [batch] + executables.collect {|executable| executable.batch }
  batches.uniq!
  
  merged = []
  batches.each do |batch| 
    merged.concat(batch)
    batch.clear
  end
  
  merged.uniq!
  batches.each {|batch| batch.concat(merged) }
  self
end

#batched?Boolean

Returns true if the batch size is greater than one (the one is assumed to be self).

Returns:

  • (Boolean)


58
59
60
# File 'lib/tap/support/executable.rb', line 58

def batched?
  batch.length > 1
end

#check_terminateObject

Raises a TerminateError if app.state == State::TERMINATE. check_terminate may be called at any time to provide a breakpoint in long-running processes.



273
274
275
276
277
# File 'lib/tap/support/executable.rb', line 273

def check_terminate
  if app.state == App::State::TERMINATE
    raise App::TerminateError.new
  end
end

#depends_on(*dependencies) ⇒ Object

Adds the dependency to each member in batch (and implicitly self). The dependency will be resolved with the input arguments during _execute, using resolve_dependencies.



208
209
210
211
212
213
# File 'lib/tap/support/executable.rb', line 208

def depends_on(*dependencies)
  batch.each do |e| 
    e.unbatched_depends_on(*dependencies)
  end
  self
end

#enq(*inputs) ⇒ Object

Enqueues each member of batch (and implicitly self) to app with the inputs. The number of inputs provided should match the number of inputs for the method_name method.



130
131
132
133
134
135
# File 'lib/tap/support/executable.rb', line 130

def enq(*inputs)
  batch.each do |executable| 
    executable.unbatched_enq(*inputs)
  end
  self
end

#execute(*inputs) ⇒ Object

Calls _execute with the inputs and returns the non-audited result. Execute is not a batched method.



266
267
268
# File 'lib/tap/support/executable.rb', line 266

def execute(*inputs)
  _execute(*inputs).value
end

#fork(*targets, &block) ⇒ Object

Sets a fork workflow pattern for self; each target will enque the results of self. See Joins::Fork.



175
176
177
# File 'lib/tap/support/executable.rb', line 175

def fork(*targets, &block) # :yields: _result
  Joins::Fork.join(self, targets, &block)
end

#initialize_batch_objObject

Initializes a new batch object and adds the object to batch. The object will be a duplicate of self.

Note this method can raise an error for objects that don’t support dup, notably Method objects generated by Object#_method.



45
46
47
48
49
50
51
52
53
54
# File 'lib/tap/support/executable.rb', line 45

def initialize_batch_obj
  obj = self.dup
  
  if obj.kind_of?(Executable)
    batch << obj
    obj
  else
    Executable.initialize(obj, method_name, app, batch, dependencies, &on_complete_block)
  end
end

#merge(*sources, &block) ⇒ Object

Sets a simple merge workflow pattern for the source tasks. Each source enques self with it’s result; no synchronization occurs, nor are results grouped before being enqued. See Joins::Merge.



182
183
184
# File 'lib/tap/support/executable.rb', line 182

def merge(*sources, &block) # :yields: _result
  Joins::Merge.join(self, sources, &block)
end

#on_complete(override = false, &block) ⇒ Object

Sets a block to receive the results of _execute for each member of batch (and implicitly self). Raises an error if on_complete_block is already set within the batch. Override the existing on_complete_block by specifying override = true.

Note: the block recieves an audited result and not the result itself (see Audit for more information).



150
151
152
153
154
155
# File 'lib/tap/support/executable.rb', line 150

def on_complete(override=false, &block) # :yields: _result
  batch.each do |executable| 
    executable.unbatched_on_complete(override, &block)
  end
  self
end

#reset_dependenciesObject

Resets dependencies so they will be re-resolved on resolve_dependencies. (See Dependency#reset).



235
236
237
238
# File 'lib/tap/support/executable.rb', line 235

def reset_dependencies
  dependencies.each {|dependency| dependency.reset }
  self
end

#resolve_dependenciesObject

Resolves dependencies. (See Dependency#resolve).



228
229
230
231
# File 'lib/tap/support/executable.rb', line 228

def resolve_dependencies
  dependencies.each {|dependency| dependency.resolve }
  self
end

#sequence(*tasks, &block) ⇒ Object

Sets a sequence workflow pattern for the tasks; each task enques the next task with it’s results, starting with self.

See Joins::Sequence.



169
170
171
# File 'lib/tap/support/executable.rb', line 169

def sequence(*tasks, &block) # :yields: _result
  Joins::Sequence.join(self, tasks, &block)
end

#switch(*targets, &block) ⇒ Object

Sets a switch workflow pattern for self. When _execute completes, switch yields the audited result to the block and the block should return the index of the target to enque with the results. No target will be enqued if the index is false or nil. An error is raised if no target can be found for the specified index. See Joins::Switch.



201
202
203
# File 'lib/tap/support/executable.rb', line 201

def switch(*targets, &block) # :yields: _result
  Joins::Switch.join(self, targets, &block)
end

#sync_merge(*sources, &block) ⇒ Object

Sets a synchronized merge workflow for the source tasks. Results from each source are collected and enqued as a single group to self. The collective results are not enqued until all sources have completed. See Joins::SyncMerge.

Raises an error if a source returns twice before the target is enqued.



192
193
194
# File 'lib/tap/support/executable.rb', line 192

def sync_merge(*sources, &block) # :yields: _result
  Joins::SyncMerge.join(self, sources, &block)
end

#unbatched_depends_on(*dependencies) ⇒ Object

Like depends_on, but only adds the dependency to self.

Raises:

  • (ArgumentError)


216
217
218
219
220
221
222
223
224
225
# File 'lib/tap/support/executable.rb', line 216

def unbatched_depends_on(*dependencies)
  raise ArgumentError, "cannot depend on self" if dependencies.include?(self)
  
  dependencies.each do |dependency|
    app.dependencies.register(dependency)
    self.dependencies << dependency unless self.dependencies.include?(dependency)
  end
  
  self
end

#unbatched_enq(*inputs) ⇒ Object

Like enq, but only enques self.



138
139
140
141
# File 'lib/tap/support/executable.rb', line 138

def unbatched_enq(*inputs)
  app.queue.enq(self, inputs)
  self
end

#unbatched_on_complete(override = false, &block) ⇒ Object

Like on_complete, but only sets the on_complete_block for self.



158
159
160
161
162
163
164
# File 'lib/tap/support/executable.rb', line 158

def unbatched_on_complete(override=false, &block) # :yields: _result
  unless on_complete_block == nil || override
    raise "on_complete_block already set: #{self}" 
  end
  @on_complete_block = block
  self
end