Class: Dataflow::Nodes::DataNode
- Inherits:
-
Object
- Object
- Dataflow::Nodes::DataNode
- Includes:
- EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
- Defined in:
- lib/dataflow/nodes/data_node.rb
Overview
Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.
Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.
Direct Known Subclasses
ReadOnlyDataNode, RuntimeQueryNode, SnapshotNode, UpsertNode
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Instance Method Summary collapse
-
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
-
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
-
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
-
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
-
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
-
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset.
-
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset.
- #drop_dataset! ⇒ Object
-
#dump_dataset(base_folder: './dump') ⇒ String
Dump a backup of this dataset to a file.
- #explain_update(depth: 0, verbose: false) ⇒ Object
- #export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object
-
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
-
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
- #import(connection_opts: {}, keys: nil) ⇒ Object
-
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage.
-
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Return a list of order (ASC) system IDs.
- #read_dataset_name ⇒ Object
-
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read.
-
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
- #required_by ⇒ Object
-
#restore_dataset(filepath:) ⇒ Object
Restore a dump of this dataset.
-
#safely_clear_write_dataset ⇒ Object
this is not safe if there is some parallel processing going on.
-
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
- #swap_read_write_datasets! ⇒ Object
-
#update_schema(sch) ⇒ Object
Update this node’s schema.
- #updated? ⇒ Boolean
- #use_symbols? ⇒ Boolean
- #write_dataset_name ⇒ Object
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
find, #recompute, #valid_for_computation?, #validate!
Instance Method Details
#add(records:) ⇒ Object
Adds the given records to the dataset and updates the updated_at time.
186 187 188 189 190 191 |
# File 'lib/dataflow/nodes/data_node.rb', line 186 def add(records:) return if records.blank? db_adapter.save(records: records) self.updated_at = Time.now save! end |
#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0) {|db_client| ... } ⇒ Object
Returns all the records from a dataset that match the options.
146 147 148 |
# File 'lib/dataflow/nodes/data_node.rb', line 146 def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, &block) db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, &block) end |
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash
Supports paginating efficiently through the dataset.
163 164 165 |
# File 'lib/dataflow/nodes/data_node.rb', line 163 def all_paginated(where: {}, fields: [], cursor: nil) db_adapter.all_paginated(where: where, fields: fields, cursor: cursor) end |
#clear(where: {}) ⇒ Object
Clear the data that matches the options.
194 195 196 |
# File 'lib/dataflow/nodes/data_node.rb', line 194 def clear(where: {}) db_adapter.delete(where: where) end |
#count(where: {}) ⇒ Integer
Counts how many records matches the condition or all if no condition is given.
180 181 182 |
# File 'lib/dataflow/nodes/data_node.rb', line 180 def count(where: {}) db_adapter.count(where: where) end |
#create_non_unique_indexes(dataset_type: :read) ⇒ Object
Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).
226 227 228 229 |
# File 'lib/dataflow/nodes/data_node.rb', line 226 def create_non_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :non_unique_only) end |
#create_unique_indexes(dataset_type: :read) ⇒ Object
Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.
218 219 220 221 |
# File 'lib/dataflow/nodes/data_node.rb', line 218 def create_unique_indexes(dataset_type: :read) dataset = send("#{dataset_type}_dataset_name") db_adapter.create_indexes(dataset: dataset, type: :unique_only) end |
#drop_dataset! ⇒ Object
335 336 337 338 339 |
# File 'lib/dataflow/nodes/data_node.rb', line 335 def drop_dataset! db_adapter.drop_dataset(write_dataset_name) return unless use_double_buffering db_adapter.drop_dataset(read_dataset_name) end |
#dump_dataset(base_folder: './dump') ⇒ String
Dump a backup of this dataset to a file.
343 344 345 |
# File 'lib/dataflow/nodes/data_node.rb', line 343 def dump_dataset(base_folder: './dump') db_adapter.dump(base_folder: base_folder) end |
#explain_update(depth: 0, verbose: false) ⇒ Object
312 313 314 |
# File 'lib/dataflow/nodes/data_node.rb', line 312 def explain_update(depth: 0, verbose: false) logger.log("#{'>' * (depth + 1)} #{name} [Dataset] | UPDATED = #{updated_at}") end |
#export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object
275 276 277 278 279 280 281 282 283 284 |
# File 'lib/dataflow/nodes/data_node.rb', line 275 def export(connection_opts: { db_backend: :csv }, keys: [], where: {}) on_export_started(connection_opts: connection_opts, keys: keys) # instanciate and export without saving anything Export::ToCsvNode.new( dependency_ids: [self], query: where.to_json, keys: keys ).compute_impl on_export_finished end |
#find(where: {}) ⇒ Hash
Finds and return from the dataset, based on the given options.
126 127 128 |
# File 'lib/dataflow/nodes/data_node.rb', line 126 def find(where: {}) db_adapter.find(where: where) end |
#handle_dataset_settings_changed ⇒ Object
When the dataset properties changed notify the adapter to handle the new settings.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/dataflow/nodes/data_node.rb', line 106 def handle_dataset_settings_changed db_adapter.update_settings(data_node: self) # if we're using double buffering, just wait for the next buffer # to be created to apply the changes. return if use_double_buffering # recreate the dataset if there is no data if db_adapter.count.zero? db_adapter.recreate_dataset(dataset: read_dataset_name) end db_adapter.create_indexes(dataset: read_dataset_name) end |
#import(connection_opts: {}, keys: nil) ⇒ Object
269 270 271 272 273 |
# File 'lib/dataflow/nodes/data_node.rb', line 269 def import(connection_opts: {}, keys: nil) importer = db_adapter(connection_opts) records = importer.all add(records: records) end |
#info(write_dataset: false) ⇒ Object
retrieves some informations about this node and its usage
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/dataflow/nodes/data_node.rb', line 287 def info(write_dataset: false) dataset = write_dataset ? write_dataset_name : read_dataset_name usage = db_adapter.usage(dataset: dataset) { name: name, type: self.class.to_s, dataset: dataset, db_backend: db_backend, updated_at: updated_at, record_count: count, indexes: indexes, effective_indexes: usage[:effective_indexes], mem_usage: usage[:memory], storage_usage: usage[:storage] } end |
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|
process(node.all(where: query))
end
174 175 176 |
# File 'lib/dataflow/nodes/data_node.rb', line 174 def ordered_system_id_queries(batch_size:, where: {}) db_adapter.ordered_system_id_queries(batch_size: batch_size, where: {}) end |
#read_dataset_name ⇒ Object
231 232 233 234 235 236 237 238 239 |
# File 'lib/dataflow/nodes/data_node.rb', line 231 def read_dataset_name return @temporary_read_dataset if @temporary_read_dataset if use_double_buffering "#{name}_buffer#{read_dataset_idx}" else name end end |
#read_dataset_name=(dataset) ⇒ Object
Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.
253 254 255 256 257 258 |
# File 'lib/dataflow/nodes/data_node.rb', line 253 def read_dataset_name=(dataset) return unless valid_dataset_names.include?(dataset) @temporary_read_dataset = dataset db_adapter.update_settings(data_node: self) dataset end |
#recreate_dataset(dataset_type: :read) ⇒ Object
Recreates a dataset.
207 208 209 210 211 |
# File 'lib/dataflow/nodes/data_node.rb', line 207 def recreate_dataset(dataset_type: :read) # fetch the proper dataset name dataset = send("#{dataset_type}_dataset_name") db_adapter.recreate_dataset(dataset: dataset) end |
#required_by ⇒ Object
316 317 318 319 320 |
# File 'lib/dataflow/nodes/data_node.rb', line 316 def required_by super + Dataflow::Nodes::ComputeNode.where(data_node_id: _id).map { |node| { node: node, type: 'dataset' } } end |
#restore_dataset(filepath:) ⇒ Object
Restore a dump of this dataset
349 350 351 |
# File 'lib/dataflow/nodes/data_node.rb', line 349 def restore_dataset(filepath:) db_adapter.restore(filepath: filepath) end |
#safely_clear_write_dataset ⇒ Object
this is not safe if there is some parallel processing going on
323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/dataflow/nodes/data_node.rb', line 323 def safely_clear_write_dataset # we can only clear the write dataset if we're using double buffering return unless use_double_buffering # check if there is any node that is currently computing to this dataset used_by = required_by.select { |x| x[:type] == 'dataset' && x[:node].locked_for_computing? } return if used_by.present? logger.log("Dropping #{db_name}.#{write_dataset_name} on #{db_backend}.") # TODO: lock the node? db_adapter.drop_dataset(write_dataset_name) end |
#set_defaults ⇒ Object
Sets the default parameters before creating the object.
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/dataflow/nodes/data_node.rb', line 81 def set_defaults self.schema = schema || {} # Use the schema as the inferred schema if none is provided. # This useful when there is no need to infer schemas (e.g. in SQL) self.inferred_schema ||= schema # This is needed for the flow to compute properly self.updated_at = Time.now end |
#swap_read_write_datasets! ⇒ Object
260 261 262 263 264 265 266 267 |
# File 'lib/dataflow/nodes/data_node.rb', line 260 def swap_read_write_datasets! raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering tmp = read_dataset_idx self.read_dataset_idx = write_dataset_idx self.write_dataset_idx = tmp db_adapter.update_settings(data_node: self) save! end |
#update_schema(sch) ⇒ Object
Update this node’s schema.
199 200 201 202 |
# File 'lib/dataflow/nodes/data_node.rb', line 199 def update_schema(sch) self.schema = sch db_adapter.update_settings(data_node: self) end |
#updated? ⇒ Boolean
308 309 310 |
# File 'lib/dataflow/nodes/data_node.rb', line 308 def updated? true end |
#use_symbols? ⇒ Boolean
304 305 306 |
# File 'lib/dataflow/nodes/data_node.rb', line 304 def use_symbols? (db_backend.to_s =~ /sql/).present? end |
#write_dataset_name ⇒ Object
241 242 243 244 245 246 247 |
# File 'lib/dataflow/nodes/data_node.rb', line 241 def write_dataset_name if use_double_buffering "#{name}_buffer#{write_dataset_idx}" else name end end |