Class: Dataflow::Nodes::ComputeNode

Inherits:
Object
  • Object
show all
Includes:
EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
Defined in:
lib/dataflow/nodes/compute_node.rb

Overview

Represents a compution. May stores its output in a separate data node. It depends on other data nodes to compute its own data.

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

find, #metadata, #required_by, #validate!

Class Method Details

.data_node_optsObject



30
31
32
# File 'lib/dataflow/nodes/compute_node.rb', line 30

def data_node_opts
  @data_node_opts || {}
end

.dependency_optsObject



26
27
28
# File 'lib/dataflow/nodes/compute_node.rb', line 26

def dependency_opts
  @dependency_opts || {}
end

.ensure_data_node_existsObject

DSL to ensure that a data node must be set before a computed node can be recomputed (as it will presumably use it to store data).



52
53
54
# File 'lib/dataflow/nodes/compute_node.rb', line 52

def ensure_data_node_exists
  @data_node_opts = { ensure_exists: true }
end

.ensure_dependencies(opts) ⇒ Object

DSL to be used while making computeqd nodes. It supports enforcing validations by checking whether there is exactly, at_least (min) or at_most (max) a given number of dependencies. Usage: class MyComputeNode < ComputeNode

ensure_dependencies exactly: 1 # could be e.g.: min: 3, or max: 5

end



40
41
42
43
44
45
46
47
48
# File 'lib/dataflow/nodes/compute_node.rb', line 40

def ensure_dependencies(opts)
  raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must be given a hash. Received: #{opts.class}" unless opts.is_a?(Hash)
  valid_keys = %i(exactly min max).freeze
  has_attributes = (valid_keys - opts.keys).count < valid_keys.count
  raise Dataflow::Errors::InvalidConfigurationError, "ensure_dependencies must have at least one of 'min', 'max' or 'exactly' attributes set. Given: #{opts.keys}" unless has_attributes

  add_property(:dependency_ids, opts)
  @dependency_opts = opts
end

Instance Method Details

#all_dependenciesObject

retrieve the whole dependency tree



149
150
151
# File 'lib/dataflow/nodes/compute_node.rb', line 149

def all_dependencies
  (dependencies + dependencies.flat_map(&:all_dependencies)).uniq
end

#compute(depth: 0, force_compute: false, source: nil) ⇒ Object

Compute this node’s data if not already updated. Acquires a computing lock before computing. In the eventuality that the lock is already acquired, it awaits until it finishes or times out.

Parameters:

  • force_compute (Boolean) (defaults to: false)

    if true, computes even if the node is already up to date.



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/dataflow/nodes/compute_node.rb', line 240

def compute(depth: 0, force_compute: false, source: nil)
  has_compute_lock = false
  validate!

  if updated? && !force_compute
    logger.log("#{'>' * (depth + 1)} #{name} is up-to-date.")
    return
  end

  has_compute_lock = acquire_computing_lock!
  if has_compute_lock
    logger.log("#{'>' * (depth + 1)} #{name} started computing.")
    on_computing_started
    start_time = Time.now

    if data_node.present? && clear_data_on_compute != data_node.use_double_buffering
      # make sure the data node has a compatible settings
      data_node.use_double_buffering = clear_data_on_compute
      data_node.save
    end

    send_heartbeat
    pre_compute(force_compute: force_compute)

    # update this node's schema with the necessary fields
    data_node&.update_schema(required_schema)

    if clear_data_on_compute
      # Pre-compute, we recreate the table, the unique indexes
      data_node&.recreate_dataset(dataset_type: :write)
      data_node&.create_unique_indexes(dataset_type: :write)
    end

    send_heartbeat
    Executor.execute(self)

    if clear_data_on_compute
      # Post-compute, delay creating other indexes for insert speed
      data_node&.create_non_unique_indexes(dataset_type: :write)
      # swap read/write datasets
      data_node&.swap_read_write_datasets!
    end

    set_last_compute_starting_time(start_time)
    duration = Time.now - start_time
    logger.log("#{'>' * (depth + 1)} #{name} took #{duration} seconds to compute.")
    on_computing_finished(state: 'computed')
    true
  else
    logger.log("#{'>' * (depth + 1)} [IS AWAITING] #{name}.")
    await_computing!
    logger.log("#{'>' * (depth + 1)} [IS DONE AWAITING] #{name}.")
  end

rescue Errors::RemoteExecutionError => e
  on_computing_finished(state: 'error', error: e) if has_compute_lock
  logger.error(error: e, custom_message: "#{name} failed computing remotely.")
rescue StandardError => e
  on_computing_finished(state: 'error', error: e) if has_compute_lock
  logger.error(error: e, custom_message: "#{name} failed computing.")
  raise
ensure
  release_computing_lock! if has_compute_lock
  true
end

#data_nodeObject

Fetch the data node if it is set



136
137
138
# File 'lib/dataflow/nodes/compute_node.rb', line 136

def data_node
  @data_node ||= Dataflow::Nodes::DataNode.find(data_node_id) if data_node_id.present?
end

#dependencies(reload: false) ⇒ Object

Override the relation because self.dependencies is not ordered.



141
142
143
144
145
146
# File 'lib/dataflow/nodes/compute_node.rb', line 141

def dependencies(reload: false)
  return @dependencies if @dependencies.present? && !reload
  @dependencies = dependency_ids.map do |x|
    Dataflow::Node.find(x)
  end
end

#execute_local_batch_computation(batch_params) ⇒ Object

Interface to execute a part (batch) of this node locally. This method is called when the framework needs to execute a batch on a worker. Override when needed, to execute a batch depending on the params. If you override, you may want to override the make_batch_params as well.



352
353
354
355
# File 'lib/dataflow/nodes/compute_node.rb', line 352

def execute_local_batch_computation(batch_params)
  records = dependencies.first.all(where: batch_params)
  compute_batch(records: records)
end

#execute_local_computationObject

Interface to execute this node locally



344
345
346
# File 'lib/dataflow/nodes/compute_node.rb', line 344

def execute_local_computation
  compute_impl
end

#execution_valid?(uuid) ⇒ Boolean

Returns:

  • (Boolean)


334
335
336
# File 'lib/dataflow/nodes/compute_node.rb', line 334

def execution_valid?(uuid)
  execution_uuid.to_s == uuid.to_s
end

#explain_update(depth: 0, verbose: false) ⇒ Object

Logs out the dependencies tree update time and whether it should or not be updated. Useful to understand why a given nodes had to be recomputed.



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/dataflow/nodes/compute_node.rb', line 172

def explain_update(depth: 0, verbose: false)
  if depth == 0 || !updated? || verbose
    logger.log("#{'>' * (depth + 1)} #{name} [COMPUTE] | #{updated? ? 'UPDATED' : 'OLD'} = #{updated_at}")
  end

  return if updated? && !verbose

  dependencies.each do |dependency|
    dependency.explain_update(depth: depth + 1, verbose: verbose)
  end
  true
end

#force_computing_lock_release!Object

Force the release of this node’s computing lock. Do not use unless there is a problem with the lock.



330
331
332
# File 'lib/dataflow/nodes/compute_node.rb', line 330

def force_computing_lock_release!
  release_computing_lock!
end

#locked_for_computing?Boolean

Check this node’s locking status.

Returns:

  • (Boolean)

    Whtere this node is locked or not.



324
325
326
# File 'lib/dataflow/nodes/compute_node.rb', line 324

def locked_for_computing?
  computing_state == 'computing'
end

#make_batch_paramsArray

Interface used to retrieve the params for scheduled batchs. Override when needed. The default implemention is to make queries that would ensure the full processing of the first dependency’s records.

Returns:

  • (Array)

    of params that are passed to scheduled batches.



361
362
363
# File 'lib/dataflow/nodes/compute_node.rb', line 361

def make_batch_params
  make_batch_queries(node: dependencies.first)
end

#needs_automatic_recomputing?Boolean

Checks whether an automatic recomputing is needed.

Returns:

  • (Boolean)


196
197
198
199
200
201
202
203
204
# File 'lib/dataflow/nodes/compute_node.rb', line 196

def needs_automatic_recomputing?
  interval = recompute_interval.to_i
  return false if interval <= 0
  return false if updated?
  return false if locked_for_computing?
  return true if updated_at.blank?

  updated_at + interval.seconds < Time.now
end

#recompute(depth: 0, force_recompute: false) ⇒ Object

Update the dependencies that need to be updated and then compute its own data.

Parameters:

  • force_recompute (Boolean) (defaults to: false)

    if true, computes even if the node is already up to date.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/dataflow/nodes/compute_node.rb', line 210

def recompute(depth: 0, force_recompute: false)
  send_heartbeat
  logger.log("#{'>' * (depth + 1)} #{name} started recomputing...")
  start_time = Time.now

  parallel_each(dependencies) do |dependency|
    logger.log("#{'>' * (depth + 1)} #{name} checking deps: #{dependency.name}...")
    if !dependency.updated? || force_recompute
      dependency.recompute(depth: depth + 1, force_recompute: force_recompute)
    end
    send_heartbeat
  end

  # Dependencies data may have changed in a child process.
  # Reload to make sure we have the latest metadata.
  logger.log("#{'>' * (depth + 1)} #{name} reloading dependencies...")
  dependencies(reload: true)

  compute(depth: depth, force_compute: force_recompute)
  logger.log("#{'>' * (depth + 1)} #{name} took #{Time.now - start_time} seconds to recompute.")

  true
end

#schemaObject

Keep a compatible interface with the data node



339
340
341
# File 'lib/dataflow/nodes/compute_node.rb', line 339

def schema
  required_schema
end

#set_defaultsObject

Sets the default parameters before creating the object.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/dataflow/nodes/compute_node.rb', line 110

def set_defaults
  # support setting the fields with a Document rather
  # than an ObjectId. Handle the transformations here:
  if data_node_id.present?
    self.data_node_id = data_node_id._id unless data_node_id.is_a?(BSON::ObjectId)

    # the data node use_double_buffering setting
    # must match clear_data_on_compute:
    if data_node.use_double_buffering != clear_data_on_compute
      data_node.use_double_buffering = clear_data_on_compute
      data_node.save
    end
  end

  # Again support having an ObjectId or a document.
  self.dependency_ids = dependency_ids.map { |dep|
    next dep if dep.is_a? BSON::ObjectId
    dep._id
  }

  # Update the data node schema with the required schema
  # for this computed node.
  data_node&.update_schema(required_schema)
end

#updated?Boolean

Returns false if any of our dependencies has been updated after our last update. We define a computed node’s last update as the time it started its last successful update (instead of the time it completed it, has dependencies may have changed in the mean time).

Returns:

  • (Boolean)


159
160
161
162
163
164
165
166
167
# File 'lib/dataflow/nodes/compute_node.rb', line 159

def updated?
  return false if updated_at.blank?

  dependencies.each do |dependency|
    return false unless dependency.updated?
    return false if dependency.updated_at > updated_at
  end
  true
end

#updated_atObject

Keep a uniform interface with a DataNode.



186
187
188
# File 'lib/dataflow/nodes/compute_node.rb', line 186

def updated_at
  last_compute_starting_time
end

#updated_at=(val) ⇒ Object



190
191
192
# File 'lib/dataflow/nodes/compute_node.rb', line 190

def updated_at=(val)
  self.last_compute_starting_time = val
end

#valid_for_computation?Boolean

Check wethere this node can or not compute. Errors are added to the active model errors.

Returns:

  • (Boolean)

    true has no errors and can be computed.



309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/dataflow/nodes/compute_node.rb', line 309

def valid_for_computation?
  # Perform additional checks: also add errors to "self.errors"
  opts = self.class.dependency_opts
  ensure_exact_dependencies(count: opts[:exactly]) if opts.key?(:exactly)
  ensure_at_most_dependencies(count: opts[:max])   if opts.key?(:max)
  ensure_at_least_dependencies(count: opts[:min])  if opts.key?(:min)
  ensure_no_cyclic_dependencies
  ensure_keys_are_set
  ensure_data_node_exists if self.class.data_node_opts[:ensure_exists]

  errors.count == 0
end