Module: Tap::Support::Executable
- Included in:
- Task
- Defined in:
- lib/tap/support/executable.rb
Overview
Executable wraps objects to make them executable by App.
Instance Attribute Summary collapse
-
#app ⇒ Object
readonly
The App receiving self during enq.
-
#batch ⇒ Object
readonly
The batch for self.
-
#dependencies ⇒ Object
readonly
An array of dependency indicies that will be resolved on _execute.
-
#method_name ⇒ Object
readonly
The method called during _execute.
-
#on_complete_block ⇒ Object
readonly
The block called when _execute completes.
Class Method Summary collapse
-
.initialize(obj, method_name, app = App.instance, batch = [], dependencies = [], &on_complete_block) ⇒ Object
Extends obj with Executable and sets up all required variables.
Instance Method Summary collapse
-
#_execute(*inputs) ⇒ Object
Auditing method call.
-
#batch_index ⇒ Object
Returns the index of self in batch.
-
#batch_with(*executables) ⇒ Object
Merges the batches for self and the specified Executables, removing duplicates.
-
#batched? ⇒ Boolean
Returns true if the batch size is greater than one (the one is assumed to be self).
-
#check_terminate ⇒ Object
Raises a TerminateError if app.state == State::TERMINATE.
-
#depends_on(*dependencies) ⇒ Object
Adds the dependency to each member in batch (and implicitly self).
-
#enq(*inputs) ⇒ Object
Enqueues each member of batch (and implicitly self) to app with the inputs.
-
#execute(*inputs) ⇒ Object
Calls _execute with the inputs and returns the non-audited result.
-
#fork(*targets, &block) ⇒ Object
Sets a fork workflow pattern for self; each target will enque the results of self.
-
#initialize_batch_obj ⇒ Object
Initializes a new batch object and adds the object to batch.
-
#merge(*sources, &block) ⇒ Object
Sets a simple merge workflow pattern for the source tasks.
-
#on_complete(override = false, &block) ⇒ Object
Sets a block to receive the results of _execute for each member of batch (and implicitly self).
-
#reset_dependencies ⇒ Object
Resets dependencies so they will be re-resolved on resolve_dependencies.
-
#resolve_dependencies ⇒ Object
Resolves dependencies.
-
#sequence(*tasks, &block) ⇒ Object
Sets a sequence workflow pattern for the tasks; each task enques the next task with it’s results, starting with self.
-
#switch(*targets, &block) ⇒ Object
Sets a switch workflow pattern for self.
-
#sync_merge(*sources, &block) ⇒ Object
Sets a synchronized merge workflow for the source tasks.
-
#unbatched_depends_on(*dependencies) ⇒ Object
Like depends_on, but only adds the dependency to self.
-
#unbatched_enq(*inputs) ⇒ Object
Like enq, but only enques self.
-
#unbatched_on_complete(override = false, &block) ⇒ Object
Like on_complete, but only sets the on_complete_block for self.
Instance Attribute Details
#app ⇒ Object (readonly)
The App receiving self during enq
10 11 12 |
# File 'lib/tap/support/executable.rb', line 10 def app @app end |
#batch ⇒ Object (readonly)
The batch for self
22 23 24 |
# File 'lib/tap/support/executable.rb', line 22 def batch @batch end |
#dependencies ⇒ Object (readonly)
An array of dependency indicies that will be resolved on _execute
19 20 21 |
# File 'lib/tap/support/executable.rb', line 19 def dependencies @dependencies end |
#method_name ⇒ Object (readonly)
The method called during _execute
13 14 15 |
# File 'lib/tap/support/executable.rb', line 13 def method_name @method_name end |
#on_complete_block ⇒ Object (readonly)
The block called when _execute completes
16 17 18 |
# File 'lib/tap/support/executable.rb', line 16 def on_complete_block @on_complete_block end |
Class Method Details
.initialize(obj, method_name, app = App.instance, batch = [], dependencies = [], &on_complete_block) ⇒ Object
Extends obj with Executable and sets up all required variables. The specified method will be called on _execute.
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/tap/support/executable.rb', line 28 def self.initialize(obj, method_name, app=App.instance, batch=[], dependencies=[], &on_complete_block) obj.extend Executable obj.instance_variable_set(:@app, app) obj.instance_variable_set(:@method_name, method_name) obj.instance_variable_set(:@on_complete_block, on_complete_block) obj.instance_variable_set(:@dependencies, dependencies) obj.instance_variable_set(:@batch, batch) batch << obj obj end |
Instance Method Details
#_execute(*inputs) ⇒ Object
Auditing method call. Resolves dependencies, executes method_name, and sends the audited result to the on_complete_block (if set).
Returns the audited result.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/tap/support/executable.rb', line 244 def _execute(*inputs) resolve_dependencies previous = [] inputs.collect! do |input| if input.kind_of?(Audit) previous << input input.value else previous << Audit.new(nil, input) input end end audit = Audit.new(self, send(method_name, *inputs), previous) on_complete_block ? on_complete_block.call(audit) : app.aggregator.store(audit) audit end |
#batch_index ⇒ Object
Returns the index of self in batch.
63 64 65 |
# File 'lib/tap/support/executable.rb', line 63 def batch_index batch.index(self) end |
#batch_with(*executables) ⇒ Object
Merges the batches for self and the specified Executables, removing duplicates.
class BatchExecutable
include Tap::Support::Executable
def initialize(batch=[])
@batch = batch
batch << self
end
end
b1 = BatchExecutable.new
b2 = BatchExecutable.new
b3 = BatchExecutable.new
b1.batch_with(b2, b3)
b1.batch # => [b1, b2, b3]
b3.batch # => [b1, b2, b3]
Note that batch_with is not recursive (ie it does not merge the batches of each member in the batch):
b4 = BatchExecutable.new
b4.batch_with(b3)
b4.batch # => [b4, b1, b2, b3]
b3.batch # => [b4, b1, b2, b3]
b2.batch # => [b1, b2, b3]
b1.batch # => [b1, b2, b3]
However it does affect all objects that share the same underlying batch:
b5 = BatchExecutable.new(b1.batch)
b6 = BatchExecutable.new
b5.batch.object_id # => b1.batch.object_id
b5.batch # => [b1, b2, b3, b5]
b5.batch_with(b6)
b5.batch # => [b1, b2, b3, b5, b6]
b1.batch # => [b1, b2, b3, b5, b6]
Returns self.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/tap/support/executable.rb', line 112 def batch_with(*executables) batches = [batch] + executables.collect {|executable| executable.batch } batches.uniq! merged = [] batches.each do |batch| merged.concat(batch) batch.clear end merged.uniq! batches.each {|batch| batch.concat(merged) } self end |
#batched? ⇒ Boolean
Returns true if the batch size is greater than one (the one is assumed to be self).
58 59 60 |
# File 'lib/tap/support/executable.rb', line 58 def batched? batch.length > 1 end |
#check_terminate ⇒ Object
Raises a TerminateError if app.state == State::TERMINATE. check_terminate may be called at any time to provide a breakpoint in long-running processes.
273 274 275 276 277 |
# File 'lib/tap/support/executable.rb', line 273 def check_terminate if app.state == App::State::TERMINATE raise App::TerminateError.new end end |
#depends_on(*dependencies) ⇒ Object
Adds the dependency to each member in batch (and implicitly self). The dependency will be resolved with the input arguments during _execute, using resolve_dependencies.
208 209 210 211 212 213 |
# File 'lib/tap/support/executable.rb', line 208 def depends_on(*dependencies) batch.each do |e| e.unbatched_depends_on(*dependencies) end self end |
#enq(*inputs) ⇒ Object
Enqueues each member of batch (and implicitly self) to app with the inputs. The number of inputs provided should match the number of inputs for the method_name method.
130 131 132 133 134 135 |
# File 'lib/tap/support/executable.rb', line 130 def enq(*inputs) batch.each do |executable| executable.unbatched_enq(*inputs) end self end |
#execute(*inputs) ⇒ Object
Calls _execute with the inputs and returns the non-audited result. Execute is not a batched method.
266 267 268 |
# File 'lib/tap/support/executable.rb', line 266 def execute(*inputs) _execute(*inputs).value end |
#fork(*targets, &block) ⇒ Object
Sets a fork workflow pattern for self; each target will enque the results of self. See Joins::Fork.
175 176 177 |
# File 'lib/tap/support/executable.rb', line 175 def fork(*targets, &block) # :yields: _result Joins::Fork.join(self, targets, &block) end |
#initialize_batch_obj ⇒ Object
Initializes a new batch object and adds the object to batch. The object will be a duplicate of self.
Note this method can raise an error for objects that don’t support dup, notably Method objects generated by Object#_method.
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/tap/support/executable.rb', line 45 def initialize_batch_obj obj = self.dup if obj.kind_of?(Executable) batch << obj obj else Executable.initialize(obj, method_name, app, batch, dependencies, &on_complete_block) end end |
#merge(*sources, &block) ⇒ Object
Sets a simple merge workflow pattern for the source tasks. Each source enques self with it’s result; no synchronization occurs, nor are results grouped before being enqued. See Joins::Merge.
182 183 184 |
# File 'lib/tap/support/executable.rb', line 182 def merge(*sources, &block) # :yields: _result Joins::Merge.join(self, sources, &block) end |
#on_complete(override = false, &block) ⇒ Object
Sets a block to receive the results of _execute for each member of batch (and implicitly self). Raises an error if on_complete_block is already set within the batch. Override the existing on_complete_block by specifying override = true.
Note: the block recieves an audited result and not the result itself (see Audit for more information).
150 151 152 153 154 155 |
# File 'lib/tap/support/executable.rb', line 150 def on_complete(override=false, &block) # :yields: _result batch.each do |executable| executable.unbatched_on_complete(override, &block) end self end |
#reset_dependencies ⇒ Object
Resets dependencies so they will be re-resolved on resolve_dependencies. (See Dependency#reset).
235 236 237 238 |
# File 'lib/tap/support/executable.rb', line 235 def reset_dependencies dependencies.each {|dependency| dependency.reset } self end |
#resolve_dependencies ⇒ Object
Resolves dependencies. (See Dependency#resolve).
228 229 230 231 |
# File 'lib/tap/support/executable.rb', line 228 def resolve_dependencies dependencies.each {|dependency| dependency.resolve } self end |
#sequence(*tasks, &block) ⇒ Object
Sets a sequence workflow pattern for the tasks; each task enques the next task with it’s results, starting with self.
See Joins::Sequence.
169 170 171 |
# File 'lib/tap/support/executable.rb', line 169 def sequence(*tasks, &block) # :yields: _result Joins::Sequence.join(self, tasks, &block) end |
#switch(*targets, &block) ⇒ Object
Sets a switch workflow pattern for self. When _execute completes, switch yields the audited result to the block and the block should return the index of the target to enque with the results. No target will be enqued if the index is false or nil. An error is raised if no target can be found for the specified index. See Joins::Switch.
201 202 203 |
# File 'lib/tap/support/executable.rb', line 201 def switch(*targets, &block) # :yields: _result Joins::Switch.join(self, targets, &block) end |
#sync_merge(*sources, &block) ⇒ Object
Sets a synchronized merge workflow for the source tasks. Results from each source are collected and enqued as a single group to self. The collective results are not enqued until all sources have completed. See Joins::SyncMerge.
Raises an error if a source returns twice before the target is enqued.
192 193 194 |
# File 'lib/tap/support/executable.rb', line 192 def sync_merge(*sources, &block) # :yields: _result Joins::SyncMerge.join(self, sources, &block) end |
#unbatched_depends_on(*dependencies) ⇒ Object
Like depends_on, but only adds the dependency to self.
216 217 218 219 220 221 222 223 224 225 |
# File 'lib/tap/support/executable.rb', line 216 def unbatched_depends_on(*dependencies) raise ArgumentError, "cannot depend on self" if dependencies.include?(self) dependencies.each do |dependency| app.dependencies.register(dependency) self.dependencies << dependency unless self.dependencies.include?(dependency) end self end |
#unbatched_enq(*inputs) ⇒ Object
Like enq, but only enques self.
138 139 140 141 |
# File 'lib/tap/support/executable.rb', line 138 def unbatched_enq(*inputs) app.queue.enq(self, inputs) self end |
#unbatched_on_complete(override = false, &block) ⇒ Object
Like on_complete, but only sets the on_complete_block for self.
158 159 160 161 162 163 164 |
# File 'lib/tap/support/executable.rb', line 158 def unbatched_on_complete(override=false, &block) # :yields: _result unless on_complete_block == nil || override raise "on_complete_block already set: #{self}" end @on_complete_block = block self end |