Class: ETL::Engine

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/etl/engine.rb

Overview

The main ETL engine clas

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#approximate_distance_of_time_in_words, #distance_of_time_in_words

Class Attribute Details

.average_rows_per_secondObject

Accessor for the average rows per second processed



136
137
138
# File 'lib/etl/engine.rb', line 136

def average_rows_per_second
  @average_rows_per_second
end

.batchObject

Access the current ETL::Execution::Batch instance



117
118
119
# File 'lib/etl/engine.rb', line 117

def batch
  @batch
end

.current_destinationObject

The current destination



95
96
97
# File 'lib/etl/engine.rb', line 95

def current_destination
  @current_destination
end

.current_sourceObject

The current source



89
90
91
# File 'lib/etl/engine.rb', line 89

def current_source
  @current_source
end

.current_source_rowObject

The current source row



92
93
94
# File 'lib/etl/engine.rb', line 92

def current_source_row
  @current_source_row
end

.jobObject

Access the current ETL::Execution::Job instance



114
115
116
# File 'lib/etl/engine.rb', line 114

def job
  @job
end

.limitObject

The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit



122
123
124
# File 'lib/etl/engine.rb', line 122

def limit
  @limit
end

.log_write_modeObject

Accessor for the log write mode. Default is ‘a’ for append.



62
63
64
# File 'lib/etl/engine.rb', line 62

def log_write_mode
  @log_write_mode
end

.loggerObject

:nodoc:



68
69
70
# File 'lib/etl/engine.rb', line 68

def logger
  @logger
end

.offsetObject

The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset



127
128
129
# File 'lib/etl/engine.rb', line 127

def offset
  @offset
end

.read_locallyObject

Set to true to read locally from the last source cache files



133
134
135
# File 'lib/etl/engine.rb', line 133

def read_locally
  @read_locally
end

.realtime_activityObject

Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT



99
100
101
# File 'lib/etl/engine.rb', line 99

def realtime_activity
  @realtime_activity
end

.rows_readObject

Accessor for the total number of rows read from sources



102
103
104
# File 'lib/etl/engine.rb', line 102

def rows_read
  @rows_read
end

.rows_writtenObject

Accessor for the total number of rows processed



108
109
110
# File 'lib/etl/engine.rb', line 108

def rows_written
  @rows_written
end

.skip_bulk_importObject

Set to true to skip all bulk importing



130
131
132
# File 'lib/etl/engine.rb', line 130

def skip_bulk_import
  @skip_bulk_import
end

.timestamped_logObject

Returns the value of attribute timestamped_log.



59
60
61
# File 'lib/etl/engine.rb', line 59

def timestamped_log
  @timestamped_log
end

.use_temp_tablesObject

Set to true to use temp tables



148
149
150
# File 'lib/etl/engine.rb', line 148

def use_temp_tables
  @use_temp_tables
end

Class Method Details

.connection(name) ⇒ Object

Get a named connection



139
140
141
142
143
144
145
# File 'lib/etl/engine.rb', line 139

def connection(name)
  logger.debug "Retrieving connection #{name}"
  conn = connections[name] ||= establish_connection(name)
  #conn.verify!(ActiveRecord::Base.verification_timeout)
  conn.reconnect! unless conn.active?
  conn
end

.finishObject

Called when a batch job finishes, allowing for cleanup to occur



156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/etl/engine.rb', line 156

def finish
  temp_tables.each do |temp_table, mapping|
    actual_table = mapping[:table]
    #puts "move #{temp_table} to #{actual_table}"
    conn = mapping[:connection]
    conn.transaction do
      conn.rename_table(actual_table, "#{actual_table}_old")
      conn.rename_table(temp_table, actual_table)
      conn.drop_table("#{actual_table}_old")
    end
  end
end

.init(options = {}) ⇒ Object

Initialization that is run when a job is executed.

Options:

  • :limit: Limit the number of records returned from sources

  • :offset: Specify the records for data from sources

  • :log_write_mode: If true then the log will write, otherwise it will append

  • :skip_bulk_import: Set to true to skip bulk import

  • :read_locally: Set to true to read from the local cache

  • :rails_root: Set to the rails root to boot rails



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/etl/engine.rb', line 21

def init(options={})
  unless @initialized
    puts "initializing ETL engine\n\n"
    @limit = options[:limit]
    @offset = options[:offset]
    @log_write_mode = 'w' if options[:newlog]
    @skip_bulk_import = options[:skip_bulk_import]
    @read_locally = options[:read_locally]
    @rails_root = options[:rails_root]
    
    require File.join(@rails_root, 'config/environment') if @rails_root
    options[:config] ||= 'database.yml'
    options[:config] = 'config/database.yml' unless File.exist?(options[:config])
    database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n")
    ActiveRecord::Base.configurations.merge!(database_configuration)
    ETL::Base.configurations = HashWithIndifferentAccess.new(database_configuration)
    #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}"
    
    require 'etl/execution'
    ETL::Execution::Base.establish_connection :etl_execution
    ETL::Execution::Execution.migrate

    @initialized = true
  end
end

.process(file) ⇒ Object

Process the specified file. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance

The process command will accept either a .ctl Control file or a .ebf ETL Batch File.



55
56
57
# File 'lib/etl/engine.rb', line 55

def process(file)
  new().process(file)
end

.table(table_name, connection) ⇒ Object

Modify the table name if necessary



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/etl/engine.rb', line 175

def table(table_name, connection)
  if use_temp_tables?
    returning "tmp_#{table_name}" do |temp_table_name|
      if temp_tables[temp_table_name].nil?
        # Create the temp table and add it to the mapping
        begin connection.drop_table(temp_table_name); rescue; end
        connection.copy_table(table_name, temp_table_name)
        temp_tables[temp_table_name] = {
          :table => table_name,
          :connection => connection
        }
      end
    end
  else
    table_name
  end
end

.temp_tablesObject

Get a registry of temp tables



151
152
153
# File 'lib/etl/engine.rb', line 151

def temp_tables
  @temp_tables ||= {}
end

.timestampObject

Get a timestamp value as a string



84
85
86
# File 'lib/etl/engine.rb', line 84

def timestamp
  Time.now.strftime("%Y%m%d%H%M%S")
end

.use_temp_tables?Boolean

Return true if using temp tables

Returns:

  • (Boolean)


170
171
172
# File 'lib/etl/engine.rb', line 170

def use_temp_tables?
  use_temp_tables ? true : false
end

Instance Method Details

#benchmarksObject

Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:

  • :transforms

  • :after_reads

  • :before_writes

  • :writes



240
241
242
243
244
245
246
247
# File 'lib/etl/engine.rb', line 240

def benchmarks
  @benchmarks ||= {
    :transforms => 0,
    :after_reads => 0,
    :before_writes => 0,
    :writes => 0,
  }
end

#errorsObject

Array of errors encountered during execution of the ETL process



229
230
231
# File 'lib/etl/engine.rb', line 229

def errors
  @errors ||= []
end

#process(file) ⇒ Object

Process a file, control object or batch object. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/etl/engine.rb', line 255

def process(file)
  case file
  when String
    process(File.new(file))
  when File
    process_control(file) if file.path =~ /.ctl$/
    process_batch(file) if file.path =~ /.ebf$/
  when ETL::Control::Control
    process_control(file)
  when ETL::Batch::Batch
    process_batch(file)
  else
    raise RuntimeError, "Process object must be a String, File, Control 
    instance or Batch instance"
  end
end

#say(message) ⇒ Object

Say the specified message, with a newline



211
212
213
# File 'lib/etl/engine.rb', line 211

def say(message)
  say_without_newline(message + "\n")
end

#say_on_own_line(message) ⇒ Object

Say the message on its own line



224
225
226
# File 'lib/etl/engine.rb', line 224

def say_on_own_line(message)
  say("\n" + message)
end

#say_without_newline(message) ⇒ Object

Say the specified message without a newline



216
217
218
219
220
221
# File 'lib/etl/engine.rb', line 216

def say_without_newline(message)
  if ETL::Engine.realtime_activity
    $stdout.print message
    $stdout.flush
  end
end