Class: Chicago::ETL::Transformation Abstract

Inherits:
Object
  • Object
show all
Defined in:
lib/chicago/etl/transformation.rb

Overview

This class is abstract.

Subclass and add a process_row method

A base class for row transformations.

Transformations process hash-like rows by filtering or altering their contents.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream, options) ⇒ Transformation #initialize(options) ⇒ Transformation

Creates the transformation.

This should not be overridden by subclasses - transformations that need their own arguments should do so by passing named options.

Overloads:

  • #initialize(stream, options) ⇒ Transformation

    Specifies this transformation applies to a specific stream. Options are specific to the stream subclass

  • #initialize(options) ⇒ Transformation

    As above, but the stream is assumed to be :default



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/chicago/etl/transformation.rb', line 27

def initialize(*args)
  stream, options = *args
  if stream.kind_of?(Hash)
    @stream = :default
    @options = stream
  else
    @stream = stream || :default
    @options = options || {}
  end

  ensure_options_present
end

Class Method Details

.added_fieldsObject

Returns the fields added by this transformation.



46
47
48
# File 'lib/chicago/etl/transformation.rb', line 46

def self.added_fields
  @added_fields ||= []
end

.adds_fields(*fields) ⇒ Object

Specify which fields are added to the row by this transformation.



63
64
65
# File 'lib/chicago/etl/transformation.rb', line 63

def self.adds_fields(*fields)
  added_fields.concat fields.flatten
end

.removed_fieldsObject

Returns the fields removed by this transformation.



51
52
53
# File 'lib/chicago/etl/transformation.rb', line 51

def self.removed_fields
  @removed_fields ||= []
end

.removes_fields(*fields) ⇒ Object

Specify which fields are removed from the row by this transformation.



69
70
71
# File 'lib/chicago/etl/transformation.rb', line 69

def self.removes_fields(*fields)
  removed_fields.concat fields.flatten
end

.required_optionsObject

Returns the required initialization options for this transformation.



41
42
43
# File 'lib/chicago/etl/transformation.rb', line 41

def self.required_options
  @required_options ||= []
end

.requires_options(*options) ⇒ Object

Specify which options are required in the constructor of this transformation.



57
58
59
# File 'lib/chicago/etl/transformation.rb', line 57

def self.requires_options(*options)
  required_options.concat options.flatten
end

Instance Method Details

#added_fieldsObject

Returns the fields added by this transformation.



79
80
81
# File 'lib/chicago/etl/transformation.rb', line 79

def added_fields
  self.class.added_fields
end

#applies_to_stream?(target_stream) ⇒ Boolean

Returns true if this transformation should be applied to a row on the target stream.

Returns:

  • (Boolean)


131
132
133
134
135
# File 'lib/chicago/etl/transformation.rb', line 131

def applies_to_stream?(target_stream)
  @stream == :all ||
    (target_stream.nil? && @stream == :default) ||
    target_stream == @stream
end

#downstream_fields(fields) ⇒ Object



92
93
94
# File 'lib/chicago/etl/transformation.rb', line 92

def downstream_fields(fields)
  ((fields - removed_fields) + added_fields).uniq
end

#flushObject

Returns all remaining rows yet to make their way through the pipeline.

This should be overridden by subclasses if the transformation holds back rows as it does processing (to find the maximum value in a set of rows for example), to ensure that all rows are written through the pipeline.



116
117
118
# File 'lib/chicago/etl/transformation.rb', line 116

def flush
  []
end

#output_streamsObject

Returns the streams to which this transformation may write rows.

By default, transformations are assumed to write only to the :default stream. Override this in subclasses as necessary.



125
126
127
# File 'lib/chicago/etl/transformation.rb', line 125

def output_streams
  [:default]
end

#process(row) ⇒ Object

Processes a row if the row is on this transformation’s stream.

This should not be overridden by subclasses, override process_row instead.

Returns:

  • Hash if a single row is returned



103
104
105
# File 'lib/chicago/etl/transformation.rb', line 103

def process(row)
  applies_to_stream?(row[STREAM]) ? process_row(row) : row
end

#removed_fieldsObject

Returns the fields removed by this transformation.



84
85
86
# File 'lib/chicago/etl/transformation.rb', line 84

def removed_fields
  self.class.removed_fields
end

#required_optionsObject

Returns the required initialization options for this transformation.



74
75
76
# File 'lib/chicago/etl/transformation.rb', line 74

def required_options
  self.class.required_options
end

#upstream_fields(fields) ⇒ Object



88
89
90
# File 'lib/chicago/etl/transformation.rb', line 88

def upstream_fields(fields)
  ((fields + removed_fields) - added_fields).uniq
end