Class: Dataflow::Nodes::ComputeNode
- Inherits:
-
Object
- Object
- Dataflow::Nodes::ComputeNode
- Includes:
- EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
- Defined in:
- lib/dataflow/nodes/compute_node.rb
Overview
Represents a compution. May stores its output in a separate data node. It depends on other data nodes to compute its own data.
Direct Known Subclasses
Export::ToCsvNode, Filter::DropWhileNode, Filter::NewestNode, Filter::WhereNode, JoinNode, MapNode, MergeNode, SelectKeysNode, SqlQueryNode, Transformation::ToTimeNode
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Class Method Summary collapse
- .data_node_opts ⇒ Object
- .dependency_opts ⇒ Object
-
.ensure_data_node_exists ⇒ Object
DSL to ensure that a data node must be set before a computed node can be recomputed (as it will presumably use it to store data).
-
.ensure_dependencies(opts) ⇒ Object
DSL to be used while making computeqd nodes.
Instance Method Summary collapse
-
#all_dependencies ⇒ Object
retrieve the whole dependency tree.
-
#compute(depth: 0, force_compute: false, source: nil) ⇒ Object
Compute this node’s data if not already updated.
-
#data_node ⇒ Object
Fetch the data node if it is set.
-
#dependencies(reload: false) ⇒ Object
Override the relation because self.dependencies is not ordered.
-
#force_computing_lock_release! ⇒ Object
Force the release of this node’s computing lock.
-
#locked_for_computing? ⇒ Boolean
Check this node’s locking status.
-
#needs_automatic_recomputing? ⇒ Boolean
Checks whether an automatic recomputing is needed.
-
#recompute(depth: 0, force_recompute: false) ⇒ Object
Update the dependencies that need to be updated and then compute its own data.
-
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
-
#updated? ⇒ Boolean
Returns false if any of our dependencies has been updated after our last update.
-
#updated_at ⇒ Object
Keep a uniform interface with a DataNode.
- #updated_at=(val) ⇒ Object
-
#valid_for_computation? ⇒ Boolean
Check wethere this node can or not compute.
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
Class Method Details
.data_node_opts ⇒ Object
30 31 32 |
# File 'lib/dataflow/nodes/compute_node.rb', line 30 def data_node_opts @data_node_opts || {} end |
.dependency_opts ⇒ Object
26 27 28 |
# File 'lib/dataflow/nodes/compute_node.rb', line 26 def dependency_opts @dependency_opts || {} end |
.ensure_data_node_exists ⇒ Object
DSL to ensure that a data node must be set before a computed node can be recomputed (as it will presumably use it to store data).
52 53 54 |
# File 'lib/dataflow/nodes/compute_node.rb', line 52 def ensure_data_node_exists @data_node_opts = { ensure_exists: true } end |
.ensure_dependencies(opts) ⇒ Object
DSL to be used while making computeqd nodes. It supports enforcing validations by checking whether there is exactly, at_least (min) or at_most (max) a given number of dependencies. Usage: class MyComputeNode < ComputeNode
ensure_dependencies exactly: 1 # could be e.g.: min: 3, or max: 5
end
40 41 42 43 44 45 46 47 48 |
# File 'lib/dataflow/nodes/compute_node.rb', line 40 def ensure_dependencies(opts) raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must be given a hash. Received: #{opts.class}" unless opts.is_a?(Hash) valid_keys = i(exactly min max).freeze has_attributes = (valid_keys - opts.keys).count < valid_keys.count raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must have at least one of 'min', 'max' or 'exactly' attributes set. Given: #{opts.keys}" unless has_attributes add_property(:dependency_ids, opts) @dependency_opts = opts end |
Instance Method Details
#all_dependencies ⇒ Object
retrieve the whole dependency tree
136 137 138 |
# File 'lib/dataflow/nodes/compute_node.rb', line 136 def all_dependencies (dependencies + dependencies.flat_map(&:all_dependencies)).uniq end |
#compute(depth: 0, force_compute: false, source: nil) ⇒ Object
Compute this node’s data if not already updated. Acquires a computing lock before computing. In the eventuality that the lock is already acquired, it awaits until it finishes or times out.
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/dataflow/nodes/compute_node.rb', line 209 def compute(depth: 0, force_compute: false, source: nil) has_compute_lock = false validate! if updated? && !force_compute logger.log "#{'>' * (depth + 1)} #{name} is up-to-date." return end has_compute_lock = acquire_computing_lock! if has_compute_lock logger.log "#{'>' * (depth + 1)} #{name} started computing." on_computing_started start_time = Time.now if data_node.present? && clear_data_on_compute != data_node.use_double_buffering # make sure the data node has a compatible settings data_node.use_double_buffering = clear_data_on_compute data_node.save end pre_compute(force_compute: force_compute) # update this node's schema with the necessary fields data_node&.update_schema(required_schema) if clear_data_on_compute # Pre-compute, we recreate the table, the unique indexes data_node&.recreate_dataset(dataset_type: :write) data_node&.create_unique_indexes(dataset_type: :write) end send_heartbeat compute_impl if clear_data_on_compute # Post-compute, delay creating other indexes for insert speed data_node&.create_non_unique_indexes(dataset_type: :write) # swap read/write datasets data_node&.swap_read_write_datasets! end self.last_compute_starting_time = start_time duration = Time.now - start_time logger.log "#{'>' * (depth + 1)} #{name} took #{duration} seconds to compute." on_computing_finished(state: 'computed') else logger.log "#{'>' * (depth + 1)} [IS AWAITING] #{name}." await_computing! logger.log "#{'>' * (depth + 1)} [IS DONE AWAITING] #{name}." end rescue StandardError => e on_computing_finished(state: 'error', error: e) if has_compute_lock logger.log "#{'>' * (depth + 1)} [ERROR] #{name} failed computing: #{e}" raise ensure release_computing_lock! if has_compute_lock true end |
#data_node ⇒ Object
Fetch the data node if it is set
123 124 125 |
# File 'lib/dataflow/nodes/compute_node.rb', line 123 def data_node @data_node ||= Dataflow::Nodes::DataNode.find(data_node_id) if data_node_id.present? end |
#dependencies(reload: false) ⇒ Object
Override the relation because self.dependencies is not ordered.
128 129 130 131 132 133 |
# File 'lib/dataflow/nodes/compute_node.rb', line 128 def dependencies(reload: false) return @dependencies if @dependencies.present? && !reload @dependencies = dependency_ids.map do |x| Dataflow::Node.find(x) end end |
#force_computing_lock_release! ⇒ Object
Force the release of this node’s computing lock. Do not use unless there is a problem with the lock.
299 300 301 |
# File 'lib/dataflow/nodes/compute_node.rb', line 299 def force_computing_lock_release! release_computing_lock! end |
#locked_for_computing? ⇒ Boolean
Check this node’s locking status.
293 294 295 |
# File 'lib/dataflow/nodes/compute_node.rb', line 293 def locked_for_computing? computing_state == 'computing' end |
#needs_automatic_recomputing? ⇒ Boolean
Checks whether an automatic recomputing is needed.
167 168 169 170 171 172 173 174 175 |
# File 'lib/dataflow/nodes/compute_node.rb', line 167 def needs_automatic_recomputing? interval = recompute_interval.to_i return false if interval <= 0 return false if updated? return false if locked_for_computing? return true if updated_at.blank? updated_at + interval.seconds < Time.now end |
#recompute(depth: 0, force_recompute: false) ⇒ Object
Update the dependencies that need to be updated and then compute its own data.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/dataflow/nodes/compute_node.rb', line 181 def recompute(depth: 0, force_recompute: false) logger.log "#{'>' * (depth + 1)} #{name} started recomputing..." start_time = Time.now parallel_each(dependencies) do |dependency| logger.log "#{'>' * (depth + 1)} #{name} checking deps: #{dependency.name}..." if !dependency.updated? || force_recompute dependency.recompute(depth: depth + 1, force_recompute: force_recompute) end end # Dependencies data may have changed in a child process. # Reload to make sure we have the latest metadata. logger.log "#{'>' * (depth + 1)} #{name} reloading dependencies..." dependencies(reload: true) compute(depth: depth, force_compute: force_recompute) logger.log "#{'>' * (depth + 1)} #{name} took #{Time.now - start_time} seconds to recompute." true end |
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/dataflow/nodes/compute_node.rb', line 97 def set_defaults # support setting the fields with a Document rather # than an ObjectId. Handle the transformations here: if data_node_id.present? self.data_node_id = data_node_id._id unless data_node_id.is_a?(BSON::ObjectId) # the data node use_double_buffering setting # must match clear_data_on_compute: if data_node.use_double_buffering != clear_data_on_compute data_node.use_double_buffering = clear_data_on_compute data_node.save end end # Again support having an ObjectId or a document. self.dependency_ids = dependency_ids.map { |dep| next dep if dep.is_a? BSON::ObjectId dep._id } # Update the data node schema with the required schema # for this computed node. data_node&.update_schema(required_schema) end |
#updated? ⇒ Boolean
Returns false if any of our dependencies has been updated after our last update. We define a computed node’s last update as the time it started its last successful update (instead of the time it completed it, has dependencies may have changed in the mean time).
146 147 148 149 150 151 152 153 154 |
# File 'lib/dataflow/nodes/compute_node.rb', line 146 def updated? return false if updated_at.blank? dependencies.each do |dependency| return false unless dependency.updated? return false if dependency.updated_at > updated_at end true end |
#updated_at ⇒ Object
Keep a uniform interface with a DataNode.
157 158 159 |
# File 'lib/dataflow/nodes/compute_node.rb', line 157 def updated_at last_compute_starting_time end |
#updated_at=(val) ⇒ Object
161 162 163 |
# File 'lib/dataflow/nodes/compute_node.rb', line 161 def updated_at=(val) self.last_compute_starting_time = val end |
#valid_for_computation? ⇒ Boolean
Check wethere this node can or not compute. Errors are added to the active model errors.
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/dataflow/nodes/compute_node.rb', line 274 def valid_for_computation? # Perform additional checks: also add errors to "self.errors" opts = self.class.dependency_opts if opts.key?(:exactly) ensure_exact_dependencies(count: opts[:exactly]) elsif opts.key?(:max) ensure_at_most_dependencies(count: opts[:max]) else # even if the min is not specified, we need at least 1 dependency ensure_at_least_dependencies(count: opts[:min] || 1) end ensure_no_cyclic_dependencies ensure_keys_are_set ensure_data_node_exists if self.class.data_node_opts[:ensure_exists] errors.count == 0 end |