Module: Dataflow::SchemaMixin

Included in:
Adapters::CsvAdapter, Nodes::ComputeNode, Nodes::DataNode
Defined in:
lib/dataflow/schema_mixin.rb

Constant Summary collapse

SEPARATOR =

if this change, update the regex that use the character directly in this mixin

'|'
SAMPLE_DATA_OUTPUT =
%w(raw tabular).freeze

Instance Method Summary collapse

Instance Method Details

#infer_partial_schema(where:, extended: false) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/dataflow/schema_mixin.rb', line 53

def infer_partial_schema(where:, extended: false)
  if db_backend == :postgresql
    # Experimental
    sch = db_adapter.client.schema(read_dataset_name).to_h
    sch = sch.reject{ |k, v| k == :_id }.map { |k,v| [k, {type: v[:type].to_s}] }.to_h
    self.inferred_schema = sch
    save
    return sch
  end

  data_count = count(where: where)
  return {} if data_count == 0

  max_per_process = 250
  max_per_process = limit_per_process if respond_to? :limit_per_process

  equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil
  count_per_process = [max_per_process, equal_split_per_process].min

  queries = ordered_system_id_queries(batch_size: count_per_process, where: where)

  sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx|
    all(where: queries[idx].merge(where))
  end
end

#infer_schema(samples_count: 0, extended: false) ⇒ Hash

Generate a schema based on this collection’s records. We evaluate the schema of each record and then merge all the information together.

Parameters:

  • extended (Boolean) (defaults to: false)

    Set to true to keep each field as a basic type. Set to false to reduce the terminal arrays to a single key (under the type array).

Returns:

  • (Hash)

    with one entry per ‘column’/‘field’. The values contains information about the type and usage.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/dataflow/schema_mixin.rb', line 13

def infer_schema(samples_count: 0, extended: false)
  if db_backend == :postgresql
    # Experimental
    sch = db_adapter.client.schema(read_dataset_name).to_h
    sch = sch.reject{ |k, v| k == :_id }.map { |k,v| [k, {type: v[:type].to_s}] }.to_h
    self.inferred_schema = sch
    save
    return sch
  end

  data_count = samples_count == 0 ? count : samples_count # invoked in the base class
  return {} if data_count == 0

  # find out how many batches are needed
  max_per_process = 1000
  max_per_process = limit_per_process if respond_to?(:limit_per_process) && limit_per_process > 0

  equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil
  count_per_process = [max_per_process, equal_split_per_process].min

  queries = ordered_system_id_queries(batch_size: count_per_process)[0...data_count]

  self.inferred_schema_at = Time.now
  self.inferred_schema_from = samples_count
  on_schema_inference_started

  sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx|
    progress = (idx / queries.count.to_f * 100).ceil
    on_schema_inference_progressed(pct_complete: progress)

    all(where: queries[idx])
  end

  self.inferred_schema = sch
  save
  on_schema_inference_finished

  sch
end

#sample_data(count: 5, mode: 'tabular') ⇒ Object

Outputs sample data. Support either output raw data (as-is) tabular data.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/dataflow/schema_mixin.rb', line 88

def sample_data(count: 5, mode: 'tabular')
  mode = mode.to_s.downcase
  unless SAMPLE_DATA_OUTPUT.include?(mode)
    raise Errors::InvalidConfigurationError, "Mode must be one of '#{SAMPLE_DATA_OUTPUT.join(', ')}'. Given: #{mode}"
  end
  samples = all { |x| x.limit(count) }.to_a
  return samples if mode == 'raw'
  return {} if samples.count == 0

  # tabular output
  schm = schema_inferrer.infer_schema(dataset: samples, extended: true)
  keys = schm.keys
  res = samples.map do |sample|
    keys.map do |key|
      value = record_value(record: sample, key: key)
      next if value.nil?
      [key, value]
    end.compact.to_h
  end

  res
end

#schema_inferrerObject



79
80
81
82
83
84
# File 'lib/dataflow/schema_mixin.rb', line 79

def schema_inferrer
  Schema::Inference::SchemaInferrer.new(
    separator: SEPARATOR,
    convert_types_to_string: true
  )
end