Class: Dataflow::Nodes::DataNode
- Inherits:
-
Object
- Object
- Dataflow::Nodes::DataNode
- Includes:
- EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
- Defined in:
- lib/dataflow/nodes/data_node.rb
Overview
Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.
Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.
Direct Known Subclasses
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Instance Method Summary collapse
-
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
-
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
-
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
-
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
-
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
-
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset.
-
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset.
- #export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object
-
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
-
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
- #import(connection_opts: {}, keys: nil) ⇒ Object
-
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage.
-
#ordered_system_id_queries(batch_size:) ⇒ Object
Return a list of order (ASC) system IDs.
- #read_dataset_name ⇒ Object
-
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read.
-
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
-
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
- #swap_read_write_datasets! ⇒ Object
-
#update_schema(sch) ⇒ Object
Update this node’s schema.
- #use_symbols? ⇒ Boolean
- #write_dataset_name ⇒ Object
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
find, #recompute, #updated?, #valid_for_computation?, #validate!
Instance Method Details
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
178 179 180 181 182 183 |
# File 'lib/dataflow/nodes/data_node.rb', line 178 def add(records:) return if records.blank? db_adapter.save(records: records) self.updated_at = Time.now save! end |
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
138 139 140 |
# File 'lib/dataflow/nodes/data_node.rb', line 138 def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, &block) db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, &block) end |
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
155 156 157 |
# File 'lib/dataflow/nodes/data_node.rb', line 155 def all_paginated(where: {}, fields: [], cursor: nil) db_adapter.all_paginated(where: where, fields: fields, cursor: cursor) end |
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
186 187 188 |
# File 'lib/dataflow/nodes/data_node.rb', line 186 def clear(where: {}) db_adapter.delete(where: where) end |
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
172 173 174 |
# File 'lib/dataflow/nodes/data_node.rb', line 172 def count(where: {}) db_adapter.count(where: where) end |
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).
218 219 220 221 |
# File 'lib/dataflow/nodes/data_node.rb', line 218 def create_non_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :non_unique_only) end |
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.
210 211 212 213 |
# File 'lib/dataflow/nodes/data_node.rb', line 210 def create_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :unique_only) end |
#export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object
267 268 269 270 271 272 273 274 275 276 |
# File 'lib/dataflow/nodes/data_node.rb', line 267 def export(connection_opts: { db_backend: :csv }, keys: [], where: {}) on_export_started(connection_opts: connection_opts, keys: keys) # instanciate and export without saving anything Export::ToCsvNode.new( dependency_ids: [self], query: where.to_json, keys: keys ).compute_impl on_export_finished end |
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
118 119 120 |
# File 'lib/dataflow/nodes/data_node.rb', line 118 def find(where: {}) db_adapter.find(where: where) end |
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/dataflow/nodes/data_node.rb', line 98 def handle_dataset_settings_changed db_adapter.update_settings(data_node: self) # if we're using double buffering, just wait for the next buffer # to be created to apply the changes. return if use_double_buffering # recreate the dataset if there is no data if db_adapter.count.zero? db_adapter.recreate_dataset(dataset: read_dataset_name) end db_adapter.create_indexes(dataset: read_dataset_name) end |
#import(connection_opts: {}, keys: nil) ⇒ Object
261 262 263 264 265 |
# File 'lib/dataflow/nodes/data_node.rb', line 261 def import(connection_opts: {}, keys: nil) importer = db_adapter(connection_opts) records = importer.all add(records: records) end |
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/dataflow/nodes/data_node.rb', line 279 def info(write_dataset: false) dataset = write_dataset ? write_dataset_name : read_dataset_name usage = db_adapter.usage(dataset: dataset) { name: name, type: self.class.to_s, dataset: dataset, db_backend: db_backend, updated_at: updated_at, record_count: count, indexes: indexes, effective_indexes: usage[:effective_indexes], mem_usage: usage[:memory], storage_usage: usage[:storage] } end |
#ordered_system_id_queries(batch_size:) ⇒ Object
Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|
process(node.all(where: query))
end
166 167 168 |
# File 'lib/dataflow/nodes/data_node.rb', line 166 def ordered_system_id_queries(batch_size:) db_adapter.ordered_system_id_queries(batch_size: batch_size) end |
#read_dataset_name ⇒ Object
223 224 225 226 227 228 229 230 231 |
# File 'lib/dataflow/nodes/data_node.rb', line 223 def read_dataset_name return @temporary_read_dataset if @temporary_read_dataset if use_double_buffering "#{name}_buffer#{read_dataset_idx}" else name end end |
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.
245 246 247 248 249 250 |
# File 'lib/dataflow/nodes/data_node.rb', line 245 def read_dataset_name=(dataset) return unless valid_dataset_names.include?(dataset) @temporary_read_dataset = dataset db_adapter.update_settings(data_node: self) dataset end |
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
199 200 201 202 203 |
# File 'lib/dataflow/nodes/data_node.rb', line 199 def recreate_dataset(dataset_type: :read) # fetch the proper dataset name dataset = send("#{dataset_type}_dataset_name") db_adapter.recreate_dataset(dataset: dataset) end |
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/dataflow/nodes/data_node.rb', line 73 def set_defaults self.schema = schema || {} # Use the schema as the inferred schema if none is provided. # This useful when there is no need to infer schemas (e.g. in SQL) self.inferred_schema ||= schema # This is needed for the flow to compute properly self.updated_at = Time.now end |
#swap_read_write_datasets! ⇒ Object
252 253 254 255 256 257 258 259 |
# File 'lib/dataflow/nodes/data_node.rb', line 252 def swap_read_write_datasets! raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering tmp = read_dataset_idx self.read_dataset_idx = write_dataset_idx self.write_dataset_idx = tmp db_adapter.update_settings(data_node: self) save! end |
#update_schema(sch) ⇒ Object
Update this node’s schema.
191 192 193 194 |
# File 'lib/dataflow/nodes/data_node.rb', line 191 def update_schema(sch) self.schema = sch db_adapter.update_settings(data_node: self) end |
#use_symbols? ⇒ Boolean
296 297 298 |
# File 'lib/dataflow/nodes/data_node.rb', line 296 def use_symbols? (db_backend.to_s =~ /sql/).present? end |
#write_dataset_name ⇒ Object
233 234 235 236 237 238 239 |
# File 'lib/dataflow/nodes/data_node.rb', line 233 def write_dataset_name if use_double_buffering "#{name}_buffer#{write_dataset_idx}" else name end end |