Module: Dataflow::SchemaMixin
- Included in:
- Adapters::CsvAdapter, Nodes::ComputeNode, Nodes::DataNode
- Defined in:
- lib/dataflow/schema_mixin.rb
Constant Summary collapse
- SEPARATOR =
if this change, update the regex that use the character directly in this mixin
'|'
- SAMPLE_DATA_OUTPUT =
%w(raw tabular).freeze
Instance Method Summary collapse
- #infer_partial_schema(where:, extended: false) ⇒ Object
-
#infer_schema(samples_count: 0, extended: false) ⇒ Hash
Generate a schema based on this collection’s records.
-
#sample_data(count: 5, mode: 'tabular') ⇒ Object
Outputs sample data.
- #schema_inferrer ⇒ Object
Instance Method Details
#infer_partial_schema(where:, extended: false) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/dataflow/schema_mixin.rb', line 53 def infer_partial_schema(where:, extended: false) if db_backend == :postgresql # Experimental sch = db_adapter.client.schema(read_dataset_name).to_h sch = sch.reject{ |k, v| k == :_id }.map { |k,v| [k, {type: v[:type].to_s}] }.to_h self.inferred_schema = sch save return sch end data_count = count(where: where) return {} if data_count == 0 max_per_process = 250 max_per_process = limit_per_process if respond_to? :limit_per_process equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = ordered_system_id_queries(batch_size: count_per_process, where: where) sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx| all(where: queries[idx].merge(where)) end end |
#infer_schema(samples_count: 0, extended: false) ⇒ Hash
Generate a schema based on this collection’s records. We evaluate the schema of each record and then merge all the information together.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/dataflow/schema_mixin.rb', line 13 def infer_schema(samples_count: 0, extended: false) if db_backend == :postgresql # Experimental sch = db_adapter.client.schema(read_dataset_name).to_h sch = sch.reject{ |k, v| k == :_id }.map { |k,v| [k, {type: v[:type].to_s}] }.to_h self.inferred_schema = sch save return sch end data_count = samples_count == 0 ? count : samples_count # invoked in the base class return {} if data_count == 0 # find out how many batches are needed max_per_process = 1000 max_per_process = limit_per_process if respond_to?(:limit_per_process) && limit_per_process > 0 equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = ordered_system_id_queries(batch_size: count_per_process)[0...data_count] self.inferred_schema_at = Time.now self.inferred_schema_from = samples_count on_schema_inference_started sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx| progress = (idx / queries.count.to_f * 100).ceil on_schema_inference_progressed(pct_complete: progress) all(where: queries[idx]) end self.inferred_schema = sch save on_schema_inference_finished sch end |
#sample_data(count: 5, mode: 'tabular') ⇒ Object
Outputs sample data. Support either output raw data (as-is) tabular data.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/dataflow/schema_mixin.rb', line 88 def sample_data(count: 5, mode: 'tabular') mode = mode.to_s.downcase unless SAMPLE_DATA_OUTPUT.include?(mode) raise Errors::InvalidConfigurationError, "Mode must be one of '#{SAMPLE_DATA_OUTPUT.join(', ')}'. Given: #{mode}" end samples = all { |x| x.limit(count) }.to_a return samples if mode == 'raw' return {} if samples.count == 0 # tabular output schm = schema_inferrer.infer_schema(dataset: samples, extended: true) keys = schm.keys res = samples.map do |sample| keys.map do |key| value = record_value(record: sample, key: key) next if value.nil? [key, value] end.compact.to_h end res end |
#schema_inferrer ⇒ Object
79 80 81 82 83 84 |
# File 'lib/dataflow/schema_mixin.rb', line 79 def schema_inferrer Schema::Inference::SchemaInferrer.new( separator: SEPARATOR, convert_types_to_string: true ) end |