Class: ETL::Engine
Overview
The main ETL engine clas
Class Attribute Summary collapse
-
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed.
-
.batch ⇒ Object
Access the current ETL::Execution::Batch instance.
-
.current_destination ⇒ Object
The current destination.
-
.current_source ⇒ Object
The current source.
-
.current_source_row ⇒ Object
The current source row.
-
.job ⇒ Object
Access the current ETL::Execution::Job instance.
-
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch.
-
.log_write_mode ⇒ Object
Accessor for the log write mode.
-
.logger ⇒ Object
:nodoc:.
-
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch.
-
.read_locally ⇒ Object
Set to true to read locally from the last source cache files.
-
.realtime_activity ⇒ Object
Set to true to activate realtime activity.
-
.rows_read ⇒ Object
Accessor for the total number of rows read from sources.
-
.rows_written ⇒ Object
Accessor for the total number of rows processed.
-
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing.
-
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
-
.use_temp_tables ⇒ Object
Set to true to use temp tables.
Class Method Summary collapse
-
.connection(name) ⇒ Object
Get a named connection.
-
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur.
-
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
-
.process(file) ⇒ Object
Process the specified file.
-
.table(table_name, connection) ⇒ Object
Modify the table name if necessary.
-
.temp_tables ⇒ Object
Get a registry of temp tables.
-
.timestamp ⇒ Object
Get a timestamp value as a string.
-
.use_temp_tables? ⇒ Boolean
Return true if using temp tables.
Instance Method Summary collapse
-
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline.
-
#errors ⇒ Object
Array of errors encountered during execution of the ETL process.
-
#process(file) ⇒ Object
Process a file, control object or batch object.
-
#say(message) ⇒ Object
Say the specified message, with a newline.
-
#say_on_own_line(message) ⇒ Object
Say the message on its own line.
-
#say_without_newline(message) ⇒ Object
Say the specified message without a newline.
Methods included from Util
#approximate_distance_of_time_in_words, #distance_of_time_in_words
Class Attribute Details
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed
136 137 138 |
# File 'lib/etl/engine.rb', line 136 def average_rows_per_second @average_rows_per_second end |
.batch ⇒ Object
Access the current ETL::Execution::Batch instance
117 118 119 |
# File 'lib/etl/engine.rb', line 117 def batch @batch end |
.current_destination ⇒ Object
The current destination
95 96 97 |
# File 'lib/etl/engine.rb', line 95 def current_destination @current_destination end |
.current_source ⇒ Object
The current source
89 90 91 |
# File 'lib/etl/engine.rb', line 89 def current_source @current_source end |
.current_source_row ⇒ Object
The current source row
92 93 94 |
# File 'lib/etl/engine.rb', line 92 def current_source_row @current_source_row end |
.job ⇒ Object
Access the current ETL::Execution::Job instance
114 115 116 |
# File 'lib/etl/engine.rb', line 114 def job @job end |
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit
122 123 124 |
# File 'lib/etl/engine.rb', line 122 def limit @limit end |
.log_write_mode ⇒ Object
Accessor for the log write mode. Default is ‘a’ for append.
62 63 64 |
# File 'lib/etl/engine.rb', line 62 def log_write_mode @log_write_mode end |
.logger ⇒ Object
:nodoc:
68 69 70 |
# File 'lib/etl/engine.rb', line 68 def logger @logger end |
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset
127 128 129 |
# File 'lib/etl/engine.rb', line 127 def offset @offset end |
.read_locally ⇒ Object
Set to true to read locally from the last source cache files
133 134 135 |
# File 'lib/etl/engine.rb', line 133 def read_locally @read_locally end |
.realtime_activity ⇒ Object
Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT
99 100 101 |
# File 'lib/etl/engine.rb', line 99 def realtime_activity @realtime_activity end |
.rows_read ⇒ Object
Accessor for the total number of rows read from sources
102 103 104 |
# File 'lib/etl/engine.rb', line 102 def rows_read @rows_read end |
.rows_written ⇒ Object
Accessor for the total number of rows processed
108 109 110 |
# File 'lib/etl/engine.rb', line 108 def rows_written @rows_written end |
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing
130 131 132 |
# File 'lib/etl/engine.rb', line 130 def skip_bulk_import @skip_bulk_import end |
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
59 60 61 |
# File 'lib/etl/engine.rb', line 59 def @timestamped_log end |
.use_temp_tables ⇒ Object
Set to true to use temp tables
148 149 150 |
# File 'lib/etl/engine.rb', line 148 def use_temp_tables @use_temp_tables end |
Class Method Details
.connection(name) ⇒ Object
Get a named connection
139 140 141 142 143 144 145 |
# File 'lib/etl/engine.rb', line 139 def connection(name) logger.debug "Retrieving connection #{name}" conn = connections[name] ||= establish_connection(name) #conn.verify!(ActiveRecord::Base.verification_timeout) conn.reconnect! unless conn.active? conn end |
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur
156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/etl/engine.rb', line 156 def finish temp_tables.each do |temp_table, mapping| actual_table = mapping[:table] #puts "move #{temp_table} to #{actual_table}" conn = mapping[:connection] conn.transaction do conn.rename_table(actual_table, "#{actual_table}_old") conn.rename_table(temp_table, actual_table) conn.drop_table("#{actual_table}_old") end end end |
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
Options:
-
:limit
: Limit the number of records returned from sources -
:offset
: Specify the records for data from sources -
:log_write_mode
: If true then the log will write, otherwise it will append -
:skip_bulk_import
: Set to true to skip bulk import -
:read_locally
: Set to true to read from the local cache -
:rails_root
: Set to the rails root to boot rails
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/etl/engine.rb', line 21 def init(={}) unless @initialized puts "initializing ETL engine\n\n" @limit = [:limit] @offset = [:offset] @log_write_mode = 'w' if [:newlog] @skip_bulk_import = [:skip_bulk_import] @read_locally = [:read_locally] @rails_root = [:rails_root] require File.join(@rails_root, 'config/environment') if @rails_root [:config] ||= 'database.yml' [:config] = 'config/database.yml' unless File.exist?([:config]) database_configuration = YAML::load(ERB.new(IO.read([:config])).result + "\n") ActiveRecord::Base.configurations.merge!(database_configuration) ETL::Base.configurations = HashWithIndifferentAccess.new(database_configuration) #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}" require 'etl/execution' ETL::Execution::Base.establish_connection :etl_execution ETL::Execution::Execution.migrate @initialized = true end end |
.process(file) ⇒ Object
Process the specified file. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
The process command will accept either a .ctl Control file or a .ebf ETL Batch File.
55 56 57 |
# File 'lib/etl/engine.rb', line 55 def process(file) new().process(file) end |
.table(table_name, connection) ⇒ Object
Modify the table name if necessary
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/etl/engine.rb', line 175 def table(table_name, connection) if use_temp_tables? returning "tmp_#{table_name}" do |temp_table_name| if temp_tables[temp_table_name].nil? # Create the temp table and add it to the mapping begin connection.drop_table(temp_table_name); rescue; end connection.copy_table(table_name, temp_table_name) temp_tables[temp_table_name] = { :table => table_name, :connection => connection } end end else table_name end end |
.temp_tables ⇒ Object
Get a registry of temp tables
151 152 153 |
# File 'lib/etl/engine.rb', line 151 def temp_tables @temp_tables ||= {} end |
.timestamp ⇒ Object
Get a timestamp value as a string
84 85 86 |
# File 'lib/etl/engine.rb', line 84 def Time.now.strftime("%Y%m%d%H%M%S") end |
.use_temp_tables? ⇒ Boolean
Return true if using temp tables
170 171 172 |
# File 'lib/etl/engine.rb', line 170 def use_temp_tables? use_temp_tables ? true : false end |
Instance Method Details
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:
-
:transforms
-
:after_reads
-
:before_writes
-
:writes
240 241 242 243 244 245 246 247 |
# File 'lib/etl/engine.rb', line 240 def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end |
#errors ⇒ Object
Array of errors encountered during execution of the ETL process
229 230 231 |
# File 'lib/etl/engine.rb', line 229 def errors @errors ||= [] end |
#process(file) ⇒ Object
Process a file, control object or batch object. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/etl/engine.rb', line 255 def process(file) case file when String process(File.new(file)) when File process_control(file) if file.path =~ /.ctl$/ process_batch(file) if file.path =~ /.ebf$/ when ETL::Control::Control process_control(file) when ETL::Batch::Batch process_batch(file) else raise RuntimeError, "Process object must be a String, File, Control instance or Batch instance" end end |
#say(message) ⇒ Object
Say the specified message, with a newline
211 212 213 |
# File 'lib/etl/engine.rb', line 211 def say() say_without_newline( + "\n") end |
#say_on_own_line(message) ⇒ Object
Say the message on its own line
224 225 226 |
# File 'lib/etl/engine.rb', line 224 def say_on_own_line() say("\n" + ) end |
#say_without_newline(message) ⇒ Object
Say the specified message without a newline
216 217 218 219 220 221 |
# File 'lib/etl/engine.rb', line 216 def say_without_newline() if ETL::Engine.realtime_activity $stdout.print $stdout.flush end end |