Module: Dataflow::SchemaMixin
- Included in:
- Adapters::CsvAdapter, Nodes::ComputeNode, Nodes::DataNode
- Defined in:
- lib/dataflow/schema_mixin.rb
Constant Summary collapse
- SEPARATOR =
if this change, update the regex that use the character directly in this mixin
'|'- SAMPLE_DATA_OUTPUT =
%w(raw tabular).freeze
Instance Method Summary collapse
- #infer_partial_schema(where:, extended: false) ⇒ Object
-
#infer_schema(samples_count: 0, extended: false) ⇒ Hash
Generate a schema based on this collection’s records.
-
#sample_data(count: 5, mode: 'tabular') ⇒ Object
Outputs sample data.
- #schema_inferrer ⇒ Object
Instance Method Details
#infer_partial_schema(where:, extended: false) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/dataflow/schema_mixin.rb', line 44 def infer_partial_schema(where:, extended: false) data_count = count(where: where) return {} if data_count == 0 max_per_process = 250 max_per_process = limit_per_process if respond_to? :limit_per_process equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = ordered_system_id_queries(batch_size: count_per_process) sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx| all(where: queries[idx].merge(where)) end end |
#infer_schema(samples_count: 0, extended: false) ⇒ Hash
Generate a schema based on this collection’s records. We evaluate the schema of each record and then merge all the information together.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/dataflow/schema_mixin.rb', line 13 def infer_schema(samples_count: 0, extended: false) data_count = samples_count == 0 ? count : samples_count # invoked in the base class return {} if data_count == 0 # find out how many batches are needed max_per_process = 1000 max_per_process = limit_per_process if respond_to? :limit_per_process equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = ordered_system_id_queries(batch_size: count_per_process) self.inferred_schema_at = Time.now self.inferred_schema_from = samples_count on_schema_inference_started sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx| progress = (idx / queries.count.to_f * 100).ceil on_schema_inference_progressed(pct_complete: progress) all(where: queries[idx]) end self.inferred_schema = sch save on_schema_inference_finished sch end |
#sample_data(count: 5, mode: 'tabular') ⇒ Object
Outputs sample data. Support either output raw data (as-is) tabular data.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/dataflow/schema_mixin.rb', line 70 def sample_data(count: 5, mode: 'tabular') mode = mode.to_s.downcase unless SAMPLE_DATA_OUTPUT.include?(mode) raise Errors::InvalidConfigurationError, "Mode must be one of '#{SAMPLE_DATA_OUTPUT.join(', ')}'. Given: #{mode}" end samples = all { |x| x.limit(count) }.to_a return samples if mode == 'raw' return {} if samples.count == 0 # tabular output schm = schema_inferrer.infer_schema(dataset: samples, extended: true) keys = schm.keys res = samples.map do |sample| keys.map do |key| value = record_value(record: sample, key: key) next if value.nil? [key, value] end.compact.to_h end res end |
#schema_inferrer ⇒ Object
61 62 63 64 65 66 |
# File 'lib/dataflow/schema_mixin.rb', line 61 def schema_inferrer Schema::Inference::SchemaInferrer.new( separator: SEPARATOR, convert_types_to_string: true ) end |