Class: Dataflow::Nodes::ComputeNode
- Inherits:
-
Object
- Object
- Dataflow::Nodes::ComputeNode
- Includes:
- EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
- Defined in:
- lib/dataflow/nodes/compute_node.rb
Overview
Represents a compution. May stores its output in a separate data node. It depends on other data nodes to compute its own data.
Direct Known Subclasses
Export::ToCsvNode, Filter::DropWhileNode, Filter::NewestNode, Filter::WhereNode, JoinNode, MapNode, MergeNode, SelectKeysNode, SqlQueryNode, Transformation::ToTimeNode
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Class Method Summary collapse
- .data_node_opts ⇒ Object
- .dependency_opts ⇒ Object
-
.ensure_data_node_exists ⇒ Object
DSL to ensure that a data node must be set before a computed node can be recomputed (as it will presumably use it to store data).
-
.ensure_dependencies(opts) ⇒ Object
DSL to be used while making computeqd nodes.
Instance Method Summary collapse
-
#all_dependencies ⇒ Object
retrieve the whole dependency tree.
-
#compute(depth: 0, force_compute: false, source: nil) ⇒ Object
Compute this node’s data if not already updated.
-
#data_node ⇒ Object
Fetch the data node if it is set.
-
#dependencies(reload: false) ⇒ Object
Override the relation because self.dependencies is not ordered.
-
#execute_local_batch_computation(batch_params) ⇒ Object
Interface to execute a part (batch) of this node locally.
-
#execute_local_computation ⇒ Object
Interface to execute this node locally.
- #execution_valid?(uuid) ⇒ Boolean
-
#explain_update(depth: 0, verbose: false) ⇒ Object
Logs out the dependencies tree update time and whether it should or not be updated.
-
#force_computing_lock_release! ⇒ Object
Force the release of this node’s computing lock.
-
#locked_for_computing? ⇒ Boolean
Check this node’s locking status.
-
#make_batch_params ⇒ Array
Interface used to retrieve the params for scheduled batchs.
-
#needs_automatic_recomputing? ⇒ Boolean
Checks whether an automatic recomputing is needed.
-
#recompute(depth: 0, force_recompute: false) ⇒ Object
Update the dependencies that need to be updated and then compute its own data.
-
#schema ⇒ Object
Keep a compatible interface with the data node.
-
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
-
#updated? ⇒ Boolean
Returns false if any of our dependencies has been updated after our last update.
-
#updated_at ⇒ Object
Keep a uniform interface with a DataNode.
- #updated_at=(val) ⇒ Object
-
#valid_for_computation? ⇒ Boolean
Check wethere this node can or not compute.
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
find, #metadata, #required_by, #validate!
Class Method Details
.data_node_opts ⇒ Object
30 31 32 |
# File 'lib/dataflow/nodes/compute_node.rb', line 30 def data_node_opts @data_node_opts || {} end |
.dependency_opts ⇒ Object
26 27 28 |
# File 'lib/dataflow/nodes/compute_node.rb', line 26 def dependency_opts @dependency_opts || {} end |
.ensure_data_node_exists ⇒ Object
DSL to ensure that a data node must be set before a computed node can be recomputed (as it will presumably use it to store data).
52 53 54 |
# File 'lib/dataflow/nodes/compute_node.rb', line 52 def ensure_data_node_exists @data_node_opts = { ensure_exists: true } end |
.ensure_dependencies(opts) ⇒ Object
DSL to be used while making computeqd nodes. It supports enforcing validations by checking whether there is exactly, at_least (min) or at_most (max) a given number of dependencies. Usage: class MyComputeNode < ComputeNode
ensure_dependencies exactly: 1 # could be e.g.: min: 3, or max: 5
end
40 41 42 43 44 45 46 47 48 |
# File 'lib/dataflow/nodes/compute_node.rb', line 40 def ensure_dependencies(opts) raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must be given a hash. Received: #{opts.class}" unless opts.is_a?(Hash) valid_keys = %i(exactly min max).freeze has_attributes = (valid_keys - opts.keys).count < valid_keys.count raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must have at least one of 'min', 'max' or 'exactly' attributes set. Given: #{opts.keys}" unless has_attributes add_property(:dependency_ids, opts) @dependency_opts = opts end |
Instance Method Details
#all_dependencies ⇒ Object
retrieve the whole dependency tree
149 150 151 |
# File 'lib/dataflow/nodes/compute_node.rb', line 149 def all_dependencies (dependencies + dependencies.flat_map(&:all_dependencies)).uniq end |
#compute(depth: 0, force_compute: false, source: nil) ⇒ Object
Compute this node’s data if not already updated. Acquires a computing lock before computing. In the eventuality that the lock is already acquired, it awaits until it finishes or times out.
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/dataflow/nodes/compute_node.rb', line 240 def compute(depth: 0, force_compute: false, source: nil) has_compute_lock = false validate! if updated? && !force_compute logger.log("#{'>' * (depth + 1)} #{name} is up-to-date.") return end has_compute_lock = acquire_computing_lock! if has_compute_lock logger.log("#{'>' * (depth + 1)} #{name} started computing.") on_computing_started start_time = Time.now if data_node.present? && clear_data_on_compute != data_node.use_double_buffering # make sure the data node has a compatible settings data_node.use_double_buffering = clear_data_on_compute data_node.save end send_heartbeat pre_compute(force_compute: force_compute) # update this node's schema with the necessary fields data_node&.update_schema(required_schema) if clear_data_on_compute # Pre-compute, we recreate the table, the unique indexes data_node&.recreate_dataset(dataset_type: :write) data_node&.create_unique_indexes(dataset_type: :write) end send_heartbeat Executor.execute(self) if clear_data_on_compute # Post-compute, delay creating other indexes for insert speed data_node&.create_non_unique_indexes(dataset_type: :write) # swap read/write datasets data_node&.swap_read_write_datasets! end set_last_compute_starting_time(start_time) duration = Time.now - start_time logger.log("#{'>' * (depth + 1)} #{name} took #{duration} seconds to compute.") on_computing_finished(state: 'computed') true else logger.log("#{'>' * (depth + 1)} [IS AWAITING] #{name}.") await_computing! logger.log("#{'>' * (depth + 1)} [IS DONE AWAITING] #{name}.") end rescue Errors::RemoteExecutionError => e on_computing_finished(state: 'error', error: e) if has_compute_lock logger.error(error: e, custom_message: "#{name} failed computing remotely.") rescue StandardError => e on_computing_finished(state: 'error', error: e) if has_compute_lock logger.error(error: e, custom_message: "#{name} failed computing.") raise ensure release_computing_lock! if has_compute_lock true end |
#data_node ⇒ Object
Fetch the data node if it is set
136 137 138 |
# File 'lib/dataflow/nodes/compute_node.rb', line 136 def data_node @data_node ||= Dataflow::Nodes::DataNode.find(data_node_id) if data_node_id.present? end |
#dependencies(reload: false) ⇒ Object
Override the relation because self.dependencies is not ordered.
141 142 143 144 145 146 |
# File 'lib/dataflow/nodes/compute_node.rb', line 141 def dependencies(reload: false) return @dependencies if @dependencies.present? && !reload @dependencies = dependency_ids.map do |x| Dataflow::Node.find(x) end end |
#execute_local_batch_computation(batch_params) ⇒ Object
Interface to execute a part (batch) of this node locally. This method is called when the framework needs to execute a batch on a worker. Override when needed, to execute a batch depending on the params. If you override, you may want to override the make_batch_params as well.
352 353 354 355 |
# File 'lib/dataflow/nodes/compute_node.rb', line 352 def execute_local_batch_computation(batch_params) records = dependencies.first.all(where: batch_params) compute_batch(records: records) end |
#execute_local_computation ⇒ Object
Interface to execute this node locally
344 345 346 |
# File 'lib/dataflow/nodes/compute_node.rb', line 344 def execute_local_computation compute_impl end |
#execution_valid?(uuid) ⇒ Boolean
334 335 336 |
# File 'lib/dataflow/nodes/compute_node.rb', line 334 def execution_valid?(uuid) execution_uuid.to_s == uuid.to_s end |
#explain_update(depth: 0, verbose: false) ⇒ Object
Logs out the dependencies tree update time and whether it should or not be updated. Useful to understand why a given nodes had to be recomputed.
172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/dataflow/nodes/compute_node.rb', line 172 def explain_update(depth: 0, verbose: false) if depth == 0 || !updated? || verbose logger.log("#{'>' * (depth + 1)} #{name} [COMPUTE] | #{updated? ? 'UPDATED' : 'OLD'} = #{updated_at}") end return if updated? && !verbose dependencies.each do |dependency| dependency.explain_update(depth: depth + 1, verbose: verbose) end true end |
#force_computing_lock_release! ⇒ Object
Force the release of this node’s computing lock. Do not use unless there is a problem with the lock.
330 331 332 |
# File 'lib/dataflow/nodes/compute_node.rb', line 330 def force_computing_lock_release! release_computing_lock! end |
#locked_for_computing? ⇒ Boolean
Check this node’s locking status.
324 325 326 |
# File 'lib/dataflow/nodes/compute_node.rb', line 324 def locked_for_computing? computing_state == 'computing' end |
#make_batch_params ⇒ Array
Interface used to retrieve the params for scheduled batchs. Override when needed. The default implemention is to make queries that would ensure the full processing of the first dependency’s records.
361 362 363 |
# File 'lib/dataflow/nodes/compute_node.rb', line 361 def make_batch_params make_batch_queries(node: dependencies.first) end |
#needs_automatic_recomputing? ⇒ Boolean
Checks whether an automatic recomputing is needed.
196 197 198 199 200 201 202 203 204 |
# File 'lib/dataflow/nodes/compute_node.rb', line 196 def needs_automatic_recomputing? interval = recompute_interval.to_i return false if interval <= 0 return false if updated? return false if locked_for_computing? return true if updated_at.blank? updated_at + interval.seconds < Time.now end |
#recompute(depth: 0, force_recompute: false) ⇒ Object
Update the dependencies that need to be updated and then compute its own data.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/dataflow/nodes/compute_node.rb', line 210 def recompute(depth: 0, force_recompute: false) send_heartbeat logger.log("#{'>' * (depth + 1)} #{name} started recomputing...") start_time = Time.now parallel_each(dependencies) do |dependency| logger.log("#{'>' * (depth + 1)} #{name} checking deps: #{dependency.name}...") if !dependency.updated? || force_recompute dependency.recompute(depth: depth + 1, force_recompute: force_recompute) end send_heartbeat end # Dependencies data may have changed in a child process. # Reload to make sure we have the latest metadata. logger.log("#{'>' * (depth + 1)} #{name} reloading dependencies...") dependencies(reload: true) compute(depth: depth, force_compute: force_recompute) logger.log("#{'>' * (depth + 1)} #{name} took #{Time.now - start_time} seconds to recompute.") true end |
#schema ⇒ Object
Keep a compatible interface with the data node
339 340 341 |
# File 'lib/dataflow/nodes/compute_node.rb', line 339 def schema required_schema end |
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/dataflow/nodes/compute_node.rb', line 110 def set_defaults # support setting the fields with a Document rather # than an ObjectId. Handle the transformations here: if data_node_id.present? self.data_node_id = data_node_id._id unless data_node_id.is_a?(BSON::ObjectId) # the data node use_double_buffering setting # must match clear_data_on_compute: if data_node.use_double_buffering != clear_data_on_compute data_node.use_double_buffering = clear_data_on_compute data_node.save end end # Again support having an ObjectId or a document. self.dependency_ids = dependency_ids.map { |dep| next dep if dep.is_a? BSON::ObjectId dep._id } # Update the data node schema with the required schema # for this computed node. data_node&.update_schema(required_schema) end |
#updated? ⇒ Boolean
Returns false if any of our dependencies has been updated after our last update. We define a computed node’s last update as the time it started its last successful update (instead of the time it completed it, has dependencies may have changed in the mean time).
159 160 161 162 163 164 165 166 167 |
# File 'lib/dataflow/nodes/compute_node.rb', line 159 def updated? return false if updated_at.blank? dependencies.each do |dependency| return false unless dependency.updated? return false if dependency.updated_at > updated_at end true end |
#updated_at ⇒ Object
Keep a uniform interface with a DataNode.
186 187 188 |
# File 'lib/dataflow/nodes/compute_node.rb', line 186 def updated_at last_compute_starting_time end |
#updated_at=(val) ⇒ Object
190 191 192 |
# File 'lib/dataflow/nodes/compute_node.rb', line 190 def updated_at=(val) self.last_compute_starting_time = val end |
#valid_for_computation? ⇒ Boolean
Check wethere this node can or not compute. Errors are added to the active model errors.
309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/dataflow/nodes/compute_node.rb', line 309 def valid_for_computation? # Perform additional checks: also add errors to "self.errors" opts = self.class.dependency_opts ensure_exact_dependencies(count: opts[:exactly]) if opts.key?(:exactly) ensure_at_most_dependencies(count: opts[:max]) if opts.key?(:max) ensure_at_least_dependencies(count: opts[:min]) if opts.key?(:min) ensure_no_cyclic_dependencies ensure_keys_are_set ensure_data_node_exists if self.class.data_node_opts[:ensure_exists] errors.count == 0 end |