Class: Dataflow::Nodes::DataNode

Inherits:
Object
  • Object
show all
Includes:
EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
Defined in:
lib/dataflow/nodes/data_node.rb

Overview

Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.

Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.

Direct Known Subclasses

SnapshotNode, UpsertNode

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

find, #recompute, #updated?, #valid_for_computation?, #validate!

Instance Method Details

#add(records:) ⇒ Object

Adds the given records to the dataset and updates the updated_at time.

Parameters:

  • records (Array)

    an array of the records to be added.



171
172
173
174
175
176
# File 'lib/dataflow/nodes/data_node.rb', line 171

def add(records:)
  return if records.blank?
  db_adapter.save(records: records)
  self.updated_at = Time.now
  save!
end

#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object

Returns all the records from a dataset that match the options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • sort (Hash) (defaults to: {})

    represents the sorting of the returned dataset. e.g. { ‘id’ => 1, ‘updated_at’ => -1 } will sort by id ASC and by updated_at DESC.

  • limit (Integer) (defaults to: 0)

    limits the amount of records returned.

  • offset (Integer) (defaults to: 0)

    starting offset of the records returned. Use with limit to implement pagination.

Yields:

  • (db_client)

    When a block is passed, yields the db client on which .each can be called to stream the results rather than load everything in memory. Other methods can also be called depending on the backend, the downside being back-end portability (use at your own risk).



131
132
133
# File 'lib/dataflow/nodes/data_node.rb', line 131

def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, &block)
  db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, &block)
end

#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash

Supports paginating efficiently through the dataset.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record. IMPORTANT: do not use the system id in the query. It will be overwritten.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • limit (Integer)

    limits the amount of records returned.

  • cursor (String) (defaults to: nil)

    indicates from which page should the results be returned.

Returns:

  • (Hash)

    with 2 fields:

    • data [Array] that contains the fetched records

    • next_cursor [String] a string to pass into the sub-sequent

      calls to fetch the next page of the data
      


148
149
150
# File 'lib/dataflow/nodes/data_node.rb', line 148

def all_paginated(where: {}, fields: [], cursor: nil)
  db_adapter.all_paginated(where: where, fields: fields, cursor: cursor)
end

#clear(where: {}) ⇒ Object

Clear the data that matches the options.



179
180
181
# File 'lib/dataflow/nodes/data_node.rb', line 179

def clear(where: {})
  db_adapter.delete(where: where)
end

#count(where: {}) ⇒ Integer

Counts how many records matches the condition or all if no condition is given.

Returns:

  • (Integer)

    the record count.



165
166
167
# File 'lib/dataflow/nodes/data_node.rb', line 165

def count(where: {})
  db_adapter.count(where: where)
end

#create_non_unique_indexes(dataset_type: :read) ⇒ Object

Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).



211
212
213
214
# File 'lib/dataflow/nodes/data_node.rb', line 211

def create_non_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :non_unique_only)
end

#create_unique_indexes(dataset_type: :read) ⇒ Object

Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



203
204
205
206
# File 'lib/dataflow/nodes/data_node.rb', line 203

def create_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :unique_only)
end

#export(connection_opts: { db_backend: :csv }, keys: nil, where: {}) ⇒ Object



260
261
262
263
264
265
# File 'lib/dataflow/nodes/data_node.rb', line 260

def export(connection_opts: { db_backend: :csv }, keys: nil, where: {})
  on_export_started(connection_opts: connection_opts, keys: keys)
  # instanciate and export without saving anything
  Export::ToCsvNode.new(dependency_ids: [self], query: where.to_json).compute_impl
  on_export_finished
end

#find(where: {}) ⇒ Hash

Finds and return from the dataset, based on the given options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

Returns:

  • (Hash)

    returns a single record from the dataset.



111
112
113
# File 'lib/dataflow/nodes/data_node.rb', line 111

def find(where: {})
  db_adapter.find(where: where)
end

#handle_dataset_settings_changedObject

When the dataset properties changed notify the adapter to handle the new settings.



95
96
97
98
99
100
101
102
103
104
# File 'lib/dataflow/nodes/data_node.rb', line 95

def handle_dataset_settings_changed
  db_adapter.update_settings(data_node: self)

  # recreate the dataset if there is no data
  if db_adapter.count.zero?
    db_adapter.recreate_dataset(dataset: read_dataset_name)
  end

  db_adapter.create_indexes(dataset: read_dataset_name)
end

#import(connection_opts: {}, keys: nil) ⇒ Object



254
255
256
257
258
# File 'lib/dataflow/nodes/data_node.rb', line 254

def import(connection_opts: {}, keys: nil)
  importer = db_adapter(connection_opts)
  records = importer.all
  add(records: records)
end

#info(write_dataset: false) ⇒ Object

retrieves some informations about this node and its usage



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/dataflow/nodes/data_node.rb', line 268

def info(write_dataset: false)
  dataset = write_dataset ? write_dataset_name : read_dataset_name
  usage = db_adapter.usage(dataset: dataset)
  {
    name: name,
    type: self.class.to_s,
    dataset: dataset,
    db_backend: db_backend,
    updated_at: updated_at,
    record_count: count,
    indexes: indexes,
    effective_indexes: usage[:effective_indexes],
    mem_usage: usage[:memory],
    storage_usage: usage[:storage]
  }
end

#ordered_system_id_queries(batch_size:) ⇒ Object

Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|

process(node.all(where: query))

end

Parameters:

  • batch_size (Integer)

    how many IDs to select per query.



159
160
161
# File 'lib/dataflow/nodes/data_node.rb', line 159

def ordered_system_id_queries(batch_size:)
  db_adapter.ordered_system_id_queries(batch_size: batch_size)
end

#read_dataset_nameObject



216
217
218
219
220
221
222
223
224
# File 'lib/dataflow/nodes/data_node.rb', line 216

def read_dataset_name
  return @temporary_read_dataset if @temporary_read_dataset

  if use_double_buffering
    "#{name}_buffer#{read_dataset_idx}"
  else
    name
  end
end

#read_dataset_name=(dataset) ⇒ Object

Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.

Parameters:

  • dataset (String)

    the dataset name from where to read from. It must be a valid dataset name for the current settings.



238
239
240
241
242
243
# File 'lib/dataflow/nodes/data_node.rb', line 238

def read_dataset_name=(dataset)
  return unless valid_dataset_names.include?(dataset)
  @temporary_read_dataset = dataset
  db_adapter.update_settings(data_node: self)
  dataset
end

#recreate_dataset(dataset_type: :read) ⇒ Object

Recreates a dataset.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



192
193
194
195
196
# File 'lib/dataflow/nodes/data_node.rb', line 192

def recreate_dataset(dataset_type: :read)
  # fetch the proper dataset name
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.recreate_dataset(dataset: dataset)
end

#set_defaultsObject

Sets the default parameters before creating the object.



73
74
75
76
77
78
79
# File 'lib/dataflow/nodes/data_node.rb', line 73

def set_defaults
  self.schema = schema || {}

  # Use the schema as the inferred schema if none is provided.
  # This useful when there is no need to infer schemas (e.g. in SQL)
  self.inferred_schema ||= schema
end

#swap_read_write_datasets!Object



245
246
247
248
249
250
251
252
# File 'lib/dataflow/nodes/data_node.rb', line 245

def swap_read_write_datasets!
  raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering
  tmp = read_dataset_idx
  self.read_dataset_idx = write_dataset_idx
  self.write_dataset_idx = tmp
  db_adapter.update_settings(data_node: self)
  save!
end

#update_schema(sch) ⇒ Object

Update this node’s schema.



184
185
186
187
# File 'lib/dataflow/nodes/data_node.rb', line 184

def update_schema(sch)
  self.schema = sch
  db_adapter.update_settings(data_node: self)
end

#use_symbols?Boolean

Returns:

  • (Boolean)


285
286
287
# File 'lib/dataflow/nodes/data_node.rb', line 285

def use_symbols?
  (db_backend.to_s =~ /sql/).present?
end

#write_dataset_nameObject



226
227
228
229
230
231
232
# File 'lib/dataflow/nodes/data_node.rb', line 226

def write_dataset_name
  if use_double_buffering
    "#{name}_buffer#{write_dataset_idx}"
  else
    name
  end
end