Class: Dataflow::Nodes::DataNode

Inherits:
Object
  • Object
show all
Includes:
EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
Defined in:
lib/dataflow/nodes/data_node.rb

Overview

Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.

Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.

Direct Known Subclasses

SnapshotNode, UpsertNode

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

find, #recompute, #updated?, #valid_for_computation?, #validate!

Instance Method Details

#add(records:) ⇒ Object

Adds the given records to the dataset and updates the updated_at time.

Parameters:

  • records (Array)

    an array of the records to be added.



178
179
180
181
182
183
# File 'lib/dataflow/nodes/data_node.rb', line 178

def add(records:)
  return if records.blank?
  db_adapter.save(records: records)
  self.updated_at = Time.now
  save!
end

#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object

Returns all the records from a dataset that match the options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • sort (Hash) (defaults to: {})

    represents the sorting of the returned dataset. e.g. { ‘id’ => 1, ‘updated_at’ => -1 } will sort by id ASC and by updated_at DESC.

  • limit (Integer) (defaults to: 0)

    limits the amount of records returned.

  • offset (Integer) (defaults to: 0)

    starting offset of the records returned. Use with limit to implement pagination.

Yields:

  • (db_client)

    When a block is passed, yields the db client on which .each can be called to stream the results rather than load everything in memory. Other methods can also be called depending on the backend, the downside being back-end portability (use at your own risk).



138
139
140
# File 'lib/dataflow/nodes/data_node.rb', line 138

def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, &block)
  db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, &block)
end

#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash

Supports paginating efficiently through the dataset.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record. IMPORTANT: do not use the system id in the query. It will be overwritten.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • limit (Integer)

    limits the amount of records returned.

  • cursor (String) (defaults to: nil)

    indicates from which page should the results be returned.

Returns:

  • (Hash)

    with 2 fields:

    • data [Array] that contains the fetched records

    • next_cursor [String] a string to pass into the sub-sequent

      calls to fetch the next page of the data
      


155
156
157
# File 'lib/dataflow/nodes/data_node.rb', line 155

def all_paginated(where: {}, fields: [], cursor: nil)
  db_adapter.all_paginated(where: where, fields: fields, cursor: cursor)
end

#clear(where: {}) ⇒ Object

Clear the data that matches the options.



186
187
188
# File 'lib/dataflow/nodes/data_node.rb', line 186

def clear(where: {})
  db_adapter.delete(where: where)
end

#count(where: {}) ⇒ Integer

Counts how many records matches the condition or all if no condition is given.

Returns:

  • (Integer)

    the record count.



172
173
174
# File 'lib/dataflow/nodes/data_node.rb', line 172

def count(where: {})
  db_adapter.count(where: where)
end

#create_non_unique_indexes(dataset_type: :read) ⇒ Object

Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).



218
219
220
221
# File 'lib/dataflow/nodes/data_node.rb', line 218

def create_non_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :non_unique_only)
end

#create_unique_indexes(dataset_type: :read) ⇒ Object

Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



210
211
212
213
# File 'lib/dataflow/nodes/data_node.rb', line 210

def create_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :unique_only)
end

#export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object



267
268
269
270
271
272
273
274
275
276
# File 'lib/dataflow/nodes/data_node.rb', line 267

def export(connection_opts: { db_backend: :csv }, keys: [], where: {})
  on_export_started(connection_opts: connection_opts, keys: keys)
  # instanciate and export without saving anything
  Export::ToCsvNode.new(
    dependency_ids: [self],
    query: where.to_json,
    keys: keys
  ).compute_impl
  on_export_finished
end

#find(where: {}) ⇒ Hash

Finds and return from the dataset, based on the given options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

Returns:

  • (Hash)

    returns a single record from the dataset.



118
119
120
# File 'lib/dataflow/nodes/data_node.rb', line 118

def find(where: {})
  db_adapter.find(where: where)
end

#handle_dataset_settings_changedObject

When the dataset properties changed notify the adapter to handle the new settings.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/dataflow/nodes/data_node.rb', line 98

def handle_dataset_settings_changed
  db_adapter.update_settings(data_node: self)

  # if we're using double buffering, just wait for the next buffer
  # to be created to apply the changes.
  return if use_double_buffering

  # recreate the dataset if there is no data
  if db_adapter.count.zero?
    db_adapter.recreate_dataset(dataset: read_dataset_name)
  end

  db_adapter.create_indexes(dataset: read_dataset_name)
end

#import(connection_opts: {}, keys: nil) ⇒ Object



261
262
263
264
265
# File 'lib/dataflow/nodes/data_node.rb', line 261

def import(connection_opts: {}, keys: nil)
  importer = db_adapter(connection_opts)
  records = importer.all
  add(records: records)
end

#info(write_dataset: false) ⇒ Object

retrieves some informations about this node and its usage



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/dataflow/nodes/data_node.rb', line 279

def info(write_dataset: false)
  dataset = write_dataset ? write_dataset_name : read_dataset_name
  usage = db_adapter.usage(dataset: dataset)
  {
    name: name,
    type: self.class.to_s,
    dataset: dataset,
    db_backend: db_backend,
    updated_at: updated_at,
    record_count: count,
    indexes: indexes,
    effective_indexes: usage[:effective_indexes],
    mem_usage: usage[:memory],
    storage_usage: usage[:storage]
  }
end

#ordered_system_id_queries(batch_size:) ⇒ Object

Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|

process(node.all(where: query))

end

Parameters:

  • batch_size (Integer)

    how many IDs to select per query.



166
167
168
# File 'lib/dataflow/nodes/data_node.rb', line 166

def ordered_system_id_queries(batch_size:)
  db_adapter.ordered_system_id_queries(batch_size: batch_size)
end

#read_dataset_nameObject



223
224
225
226
227
228
229
230
231
# File 'lib/dataflow/nodes/data_node.rb', line 223

def read_dataset_name
  return @temporary_read_dataset if @temporary_read_dataset

  if use_double_buffering
    "#{name}_buffer#{read_dataset_idx}"
  else
    name
  end
end

#read_dataset_name=(dataset) ⇒ Object

Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.

Parameters:

  • dataset (String)

    the dataset name from where to read from. It must be a valid dataset name for the current settings.



245
246
247
248
249
250
# File 'lib/dataflow/nodes/data_node.rb', line 245

def read_dataset_name=(dataset)
  return unless valid_dataset_names.include?(dataset)
  @temporary_read_dataset = dataset
  db_adapter.update_settings(data_node: self)
  dataset
end

#recreate_dataset(dataset_type: :read) ⇒ Object

Recreates a dataset.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



199
200
201
202
203
# File 'lib/dataflow/nodes/data_node.rb', line 199

def recreate_dataset(dataset_type: :read)
  # fetch the proper dataset name
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.recreate_dataset(dataset: dataset)
end

#set_defaultsObject

Sets the default parameters before creating the object.



73
74
75
76
77
78
79
80
81
82
# File 'lib/dataflow/nodes/data_node.rb', line 73

def set_defaults
  self.schema = schema || {}

  # Use the schema as the inferred schema if none is provided.
  # This useful when there is no need to infer schemas (e.g. in SQL)
  self.inferred_schema ||= schema

  # This is needed for the flow to compute properly
  self.updated_at = Time.now
end

#swap_read_write_datasets!Object



252
253
254
255
256
257
258
259
# File 'lib/dataflow/nodes/data_node.rb', line 252

def swap_read_write_datasets!
  raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering
  tmp = read_dataset_idx
  self.read_dataset_idx = write_dataset_idx
  self.write_dataset_idx = tmp
  db_adapter.update_settings(data_node: self)
  save!
end

#update_schema(sch) ⇒ Object

Update this node’s schema.



191
192
193
194
# File 'lib/dataflow/nodes/data_node.rb', line 191

def update_schema(sch)
  self.schema = sch
  db_adapter.update_settings(data_node: self)
end

#use_symbols?Boolean

Returns:

  • (Boolean)


296
297
298
# File 'lib/dataflow/nodes/data_node.rb', line 296

def use_symbols?
  (db_backend.to_s =~ /sql/).present?
end

#write_dataset_nameObject



233
234
235
236
237
238
239
# File 'lib/dataflow/nodes/data_node.rb', line 233

def write_dataset_name
  if use_double_buffering
    "#{name}_buffer#{write_dataset_idx}"
  else
    name
  end
end