Class: Dataflow::Nodes::DataNode
- Inherits:
-
Object
- Object
- Dataflow::Nodes::DataNode
- Includes:
- EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
- Defined in:
- lib/dataflow/nodes/data_node.rb
Overview
Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.
Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.
Direct Known Subclasses
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Instance Method Summary collapse
-
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
-
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
-
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
-
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
-
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
-
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset.
-
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset.
- #export(connection_opts: { db_backend: :csv }, keys: nil, where: {}) ⇒ Object
-
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
-
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
- #import(connection_opts: {}, keys: nil) ⇒ Object
-
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage.
-
#ordered_system_id_queries(batch_size:) ⇒ Object
Return a list of order (ASC) system IDs.
- #read_dataset_name ⇒ Object
-
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read.
-
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
-
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
- #swap_read_write_datasets! ⇒ Object
-
#update_schema(sch) ⇒ Object
Update this node’s schema.
- #use_symbols? ⇒ Boolean
- #write_dataset_name ⇒ Object
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
find, #recompute, #updated?, #valid_for_computation?, #validate!
Instance Method Details
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
171 172 173 174 175 176 |
# File 'lib/dataflow/nodes/data_node.rb', line 171 def add(records:) return if records.blank? db_adapter.save(records: records) self.updated_at = Time.now save! end |
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
131 132 133 |
# File 'lib/dataflow/nodes/data_node.rb', line 131 def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, &block) db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, &block) end |
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
148 149 150 |
# File 'lib/dataflow/nodes/data_node.rb', line 148 def all_paginated(where: {}, fields: [], cursor: nil) db_adapter.all_paginated(where: where, fields: fields, cursor: cursor) end |
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
179 180 181 |
# File 'lib/dataflow/nodes/data_node.rb', line 179 def clear(where: {}) db_adapter.delete(where: where) end |
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
165 166 167 |
# File 'lib/dataflow/nodes/data_node.rb', line 165 def count(where: {}) db_adapter.count(where: where) end |
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).
211 212 213 214 |
# File 'lib/dataflow/nodes/data_node.rb', line 211 def create_non_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :non_unique_only) end |
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.
203 204 205 206 |
# File 'lib/dataflow/nodes/data_node.rb', line 203 def create_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :unique_only) end |
#export(connection_opts: { db_backend: :csv }, keys: nil, where: {}) ⇒ Object
260 261 262 263 264 265 |
# File 'lib/dataflow/nodes/data_node.rb', line 260 def export(connection_opts: { db_backend: :csv }, keys: nil, where: {}) on_export_started(connection_opts: connection_opts, keys: keys) # instanciate and export without saving anything Export::ToCsvNode.new(dependency_ids: [self], query: where.to_json).compute_impl on_export_finished end |
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
111 112 113 |
# File 'lib/dataflow/nodes/data_node.rb', line 111 def find(where: {}) db_adapter.find(where: where) end |
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/dataflow/nodes/data_node.rb', line 95 def handle_dataset_settings_changed db_adapter.update_settings(data_node: self) # recreate the dataset if there is no data if db_adapter.count.zero? db_adapter.recreate_dataset(dataset: read_dataset_name) end db_adapter.create_indexes(dataset: read_dataset_name) end |
#import(connection_opts: {}, keys: nil) ⇒ Object
254 255 256 257 258 |
# File 'lib/dataflow/nodes/data_node.rb', line 254 def import(connection_opts: {}, keys: nil) importer = db_adapter(connection_opts) records = importer.all add(records: records) end |
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/dataflow/nodes/data_node.rb', line 268 def info(write_dataset: false) dataset = write_dataset ? write_dataset_name : read_dataset_name usage = db_adapter.usage(dataset: dataset) { name: name, type: self.class.to_s, dataset: dataset, db_backend: db_backend, updated_at: updated_at, record_count: count, indexes: indexes, effective_indexes: usage[:effective_indexes], mem_usage: usage[:memory], storage_usage: usage[:storage] } end |
#ordered_system_id_queries(batch_size:) ⇒ Object
Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|
process(node.all(where: query))
end
159 160 161 |
# File 'lib/dataflow/nodes/data_node.rb', line 159 def ordered_system_id_queries(batch_size:) db_adapter.ordered_system_id_queries(batch_size: batch_size) end |
#read_dataset_name ⇒ Object
216 217 218 219 220 221 222 223 224 |
# File 'lib/dataflow/nodes/data_node.rb', line 216 def read_dataset_name return @temporary_read_dataset if @temporary_read_dataset if use_double_buffering "#{name}_buffer#{read_dataset_idx}" else name end end |
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.
238 239 240 241 242 243 |
# File 'lib/dataflow/nodes/data_node.rb', line 238 def read_dataset_name=(dataset) return unless valid_dataset_names.include?(dataset) @temporary_read_dataset = dataset db_adapter.update_settings(data_node: self) dataset end |
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
192 193 194 195 196 |
# File 'lib/dataflow/nodes/data_node.rb', line 192 def recreate_dataset(dataset_type: :read) # fetch the proper dataset name dataset = send("#{dataset_type}_dataset_name") db_adapter.recreate_dataset(dataset: dataset) end |
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
73 74 75 76 77 78 79 |
# File 'lib/dataflow/nodes/data_node.rb', line 73 def set_defaults self.schema = schema || {} # Use the schema as the inferred schema if none is provided. # This useful when there is no need to infer schemas (e.g. in SQL) self.inferred_schema ||= schema end |
#swap_read_write_datasets! ⇒ Object
245 246 247 248 249 250 251 252 |
# File 'lib/dataflow/nodes/data_node.rb', line 245 def swap_read_write_datasets! raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering tmp = read_dataset_idx self.read_dataset_idx = write_dataset_idx self.write_dataset_idx = tmp db_adapter.update_settings(data_node: self) save! end |
#update_schema(sch) ⇒ Object
Update this node’s schema.
184 185 186 187 |
# File 'lib/dataflow/nodes/data_node.rb', line 184 def update_schema(sch) self.schema = sch db_adapter.update_settings(data_node: self) end |
#use_symbols? ⇒ Boolean
285 286 287 |
# File 'lib/dataflow/nodes/data_node.rb', line 285 def use_symbols? (db_backend.to_s =~ /sql/).present? end |
#write_dataset_name ⇒ Object
226 227 228 229 230 231 232 |
# File 'lib/dataflow/nodes/data_node.rb', line 226 def write_dataset_name if use_double_buffering "#{name}_buffer#{write_dataset_idx}" else name end end |