Class: Dataflow::Nodes::DataNode

Inherits:
Object
  • Object
show all
Includes:
EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
Defined in:
lib/dataflow/nodes/data_node.rb

Overview

Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.

Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

find, #recompute, #valid_for_computation?, #validate!

Instance Method Details

#add(records:) ⇒ Object

Adds the given records to the dataset and updates the updated_at time.

Parameters:

  • records (Array)

    an array of the records to be added.



186
187
188
189
190
191
# File 'lib/dataflow/nodes/data_node.rb', line 186

def add(records:)
  return if records.blank?
  db_adapter.save(records: records)
  self.updated_at = Time.now
  save!
end

#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object

Returns all the records from a dataset that match the options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • sort (Hash) (defaults to: {})

    represents the sorting of the returned dataset. e.g. { ‘id’ => 1, ‘updated_at’ => -1 } will sort by id ASC and by updated_at DESC.

  • limit (Integer) (defaults to: 0)

    limits the amount of records returned.

  • offset (Integer) (defaults to: 0)

    starting offset of the records returned. Use with limit to implement pagination.

Yields:

  • (db_client)

    When a block is passed, yields the db client on which .each can be called to stream the results rather than load everything in memory. Other methods can also be called depending on the backend, the downside being back-end portability (use at your own risk).



146
147
148
# File 'lib/dataflow/nodes/data_node.rb', line 146

def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, &block)
  db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, &block)
end

#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash

Supports paginating efficiently through the dataset.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record. IMPORTANT: do not use the system id in the query. It will be overwritten.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • limit (Integer)

    limits the amount of records returned.

  • cursor (String) (defaults to: nil)

    indicates from which page should the results be returned.

Returns:

  • (Hash)

    with 2 fields:

    • data [Array] that contains the fetched records

    • next_cursor [String] a string to pass into the sub-sequent

      calls to fetch the next page of the data
      


163
164
165
# File 'lib/dataflow/nodes/data_node.rb', line 163

def all_paginated(where: {}, fields: [], cursor: nil)
  db_adapter.all_paginated(where: where, fields: fields, cursor: cursor)
end

#clear(where: {}) ⇒ Object

Clear the data that matches the options.



194
195
196
# File 'lib/dataflow/nodes/data_node.rb', line 194

def clear(where: {})
  db_adapter.delete(where: where)
end

#count(where: {}) ⇒ Integer

Counts how many records matches the condition or all if no condition is given.

Returns:

  • (Integer)

    the record count.



180
181
182
# File 'lib/dataflow/nodes/data_node.rb', line 180

def count(where: {})
  db_adapter.count(where: where)
end

#create_non_unique_indexes(dataset_type: :read) ⇒ Object

Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).



226
227
228
229
# File 'lib/dataflow/nodes/data_node.rb', line 226

def create_non_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :non_unique_only)
end

#create_unique_indexes(dataset_type: :read) ⇒ Object

Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



218
219
220
221
# File 'lib/dataflow/nodes/data_node.rb', line 218

def create_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :unique_only)
end

#drop_dataset!Object



335
336
337
338
339
# File 'lib/dataflow/nodes/data_node.rb', line 335

def drop_dataset!
  db_adapter.drop_dataset(write_dataset_name)
  return unless use_double_buffering
  db_adapter.drop_dataset(read_dataset_name)
end

#dump_dataset(base_folder: './dump') ⇒ String

Dump a backup of this dataset to a file.

Returns:

  • (String)

    the filepath to the dump file.



343
344
345
# File 'lib/dataflow/nodes/data_node.rb', line 343

def dump_dataset(base_folder: './dump')
  db_adapter.dump(base_folder: base_folder)
end

#explain_update(depth: 0, verbose: false) ⇒ Object



312
313
314
# File 'lib/dataflow/nodes/data_node.rb', line 312

def explain_update(depth: 0, verbose: false)
  logger.log("#{'>' * (depth + 1)} #{name} [Dataset] | UPDATED = #{updated_at}")
end

#export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object



275
276
277
278
279
280
281
282
283
284
# File 'lib/dataflow/nodes/data_node.rb', line 275

def export(connection_opts: { db_backend: :csv }, keys: [], where: {})
  on_export_started(connection_opts: connection_opts, keys: keys)
  # instanciate and export without saving anything
  Export::ToCsvNode.new(
    dependency_ids: [self],
    query: where.to_json,
    keys: keys
  ).compute_impl
  on_export_finished
end

#find(where: {}) ⇒ Hash

Finds and return from the dataset, based on the given options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

Returns:

  • (Hash)

    returns a single record from the dataset.



126
127
128
# File 'lib/dataflow/nodes/data_node.rb', line 126

def find(where: {})
  db_adapter.find(where: where)
end

#handle_dataset_settings_changedObject

When the dataset properties changed notify the adapter to handle the new settings.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/dataflow/nodes/data_node.rb', line 106

def handle_dataset_settings_changed
  db_adapter.update_settings(data_node: self)

  # if we're using double buffering, just wait for the next buffer
  # to be created to apply the changes.
  return if use_double_buffering

  # recreate the dataset if there is no data
  if db_adapter.count.zero?
    db_adapter.recreate_dataset(dataset: read_dataset_name)
  end

  db_adapter.create_indexes(dataset: read_dataset_name)
end

#import(connection_opts: {}, keys: nil) ⇒ Object



269
270
271
272
273
# File 'lib/dataflow/nodes/data_node.rb', line 269

def import(connection_opts: {}, keys: nil)
  importer = db_adapter(connection_opts)
  records = importer.all
  add(records: records)
end

#info(write_dataset: false) ⇒ Object

retrieves some informations about this node and its usage



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/dataflow/nodes/data_node.rb', line 287

def info(write_dataset: false)
  dataset = write_dataset ? write_dataset_name : read_dataset_name
  usage = db_adapter.usage(dataset: dataset)
  {
    name: name,
    type: self.class.to_s,
    dataset: dataset,
    db_backend: db_backend,
    updated_at: updated_at,
    record_count: count,
    indexes: indexes,
    effective_indexes: usage[:effective_indexes],
    mem_usage: usage[:memory],
    storage_usage: usage[:storage]
  }
end

#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object

Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|

process(node.all(where: query))

end

Parameters:

  • batch_size (Integer)

    how many IDs to select per query.



174
175
176
# File 'lib/dataflow/nodes/data_node.rb', line 174

def ordered_system_id_queries(batch_size:, where: {})
  db_adapter.ordered_system_id_queries(batch_size: batch_size, where: {})
end

#read_dataset_nameObject



231
232
233
234
235
236
237
238
239
# File 'lib/dataflow/nodes/data_node.rb', line 231

def read_dataset_name
  return @temporary_read_dataset if @temporary_read_dataset

  if use_double_buffering
    "#{name}_buffer#{read_dataset_idx}"
  else
    name
  end
end

#read_dataset_name=(dataset) ⇒ Object

Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.

Parameters:

  • dataset (String)

    the dataset name from where to read from. It must be a valid dataset name for the current settings.



253
254
255
256
257
258
# File 'lib/dataflow/nodes/data_node.rb', line 253

def read_dataset_name=(dataset)
  return unless valid_dataset_names.include?(dataset)
  @temporary_read_dataset = dataset
  db_adapter.update_settings(data_node: self)
  dataset
end

#recreate_dataset(dataset_type: :read) ⇒ Object

Recreates a dataset.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



207
208
209
210
211
# File 'lib/dataflow/nodes/data_node.rb', line 207

def recreate_dataset(dataset_type: :read)
  # fetch the proper dataset name
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.recreate_dataset(dataset: dataset)
end

#required_byObject



316
317
318
319
320
# File 'lib/dataflow/nodes/data_node.rb', line 316

def required_by
  super + Dataflow::Nodes::ComputeNode.where(data_node_id: _id).map { |node|
    { node: node, type: 'dataset' }
  }
end

#restore_dataset(filepath:) ⇒ Object

Restore a dump of this dataset

Parameters:

  • files (String)

    the filepath to the dump file.



349
350
351
# File 'lib/dataflow/nodes/data_node.rb', line 349

def restore_dataset(filepath:)
  db_adapter.restore(filepath: filepath)
end

#safely_clear_write_datasetObject

this is not safe if there is some parallel processing going on



323
324
325
326
327
328
329
330
331
332
333
# File 'lib/dataflow/nodes/data_node.rb', line 323

def safely_clear_write_dataset
  # we can only clear the write dataset if we're using double buffering
  return unless use_double_buffering
  # check if there is any node that is currently computing to this dataset
  used_by = required_by.select { |x| x[:type] == 'dataset' && x[:node].locked_for_computing? }
  return if used_by.present?

  logger.log("Dropping #{db_name}.#{write_dataset_name} on #{db_backend}.")
  # TODO: lock the node?
  db_adapter.drop_dataset(write_dataset_name)
end

#set_defaultsObject

Sets the default parameters before creating the object.



81
82
83
84
85
86
87
88
89
90
# File 'lib/dataflow/nodes/data_node.rb', line 81

def set_defaults
  self.schema = schema || {}

  # Use the schema as the inferred schema if none is provided.
  # This useful when there is no need to infer schemas (e.g. in SQL)
  self.inferred_schema ||= schema

  # This is needed for the flow to compute properly
  self.updated_at = Time.now
end

#swap_read_write_datasets!Object



260
261
262
263
264
265
266
267
# File 'lib/dataflow/nodes/data_node.rb', line 260

def swap_read_write_datasets!
  raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering
  tmp = read_dataset_idx
  self.read_dataset_idx = write_dataset_idx
  self.write_dataset_idx = tmp
  db_adapter.update_settings(data_node: self)
  save!
end

#update_schema(sch) ⇒ Object

Update this node’s schema.



199
200
201
202
# File 'lib/dataflow/nodes/data_node.rb', line 199

def update_schema(sch)
  self.schema = sch
  db_adapter.update_settings(data_node: self)
end

#updated?Boolean

Returns:

  • (Boolean)


308
309
310
# File 'lib/dataflow/nodes/data_node.rb', line 308

def updated?
  true
end

#use_symbols?Boolean

Returns:

  • (Boolean)


304
305
306
# File 'lib/dataflow/nodes/data_node.rb', line 304

def use_symbols?
  (db_backend.to_s =~ /sql/).present?
end

#write_dataset_nameObject



241
242
243
244
245
246
247
# File 'lib/dataflow/nodes/data_node.rb', line 241

def write_dataset_name
  if use_double_buffering
    "#{name}_buffer#{write_dataset_idx}"
  else
    name
  end
end