Class: Chicago::ETL::Batch
- Inherits:
-
Sequel::Model
- Object
- Sequel::Model
- Chicago::ETL::Batch
- Defined in:
- lib/chicago/etl/batch.rb
Overview
A particular “run” of the ETL process.
All ETL tasks should be executed in the context of a Batch.
A batch creates a temporary directory under tmp/batches/:id where it stores various logs and extract files.
Class Method Summary collapse
-
.instance ⇒ Object
Returns the Batch that should be used for the ETL process.
-
.last_batch ⇒ Object
Returns the last batch run, or nil if this is the first batch.
Instance Method Summary collapse
- #after_create ⇒ Object private
-
#dir ⇒ Object
Returns the directory files & batch logs will be written to.
-
#error ⇒ Object
Sets this batch to the Error state.
-
#extract(task_name, &block) ⇒ Object
deprecated
Deprecated.
Use perform_task instead
- #extract_from ⇒ Object
-
#finish ⇒ Object
Finishes this batch, and sets the finished_at timestamp.
-
#finished? ⇒ Boolean
Returns true if this batch is finished.
-
#in_error? ⇒ Boolean
Returns true if in the error state.
-
#load(task_name, &block) ⇒ Object
deprecated
Deprecated.
Use perform_task instead
-
#log ⇒ Object
Returns the logger for this batch.
-
#perform_task(stage, task_name, &block) ⇒ Object
Performs a named task if it hasn’t already run successfully in this batch.
-
#reextract ⇒ Object
Marks this batch for re-extraction.
-
#reextracting? ⇒ Boolean
Returns true when this batch should re-extract - i.e.
-
#start(extract_to = nil) ⇒ Object
Starts this batch.
-
#transform(task_name, &block) ⇒ Object
deprecated
Deprecated.
Use perform_task instead
Class Method Details
.instance ⇒ Object
Returns the Batch that should be used for the ETL process.
A new batch is returned, unless the previous batch did not finish successfully.
This should be used in preference to new or create.
24 25 26 27 28 29 30 |
# File 'lib/chicago/etl/batch.rb', line 24 def instance if last_batch.nil? || last_batch.finished? || last_batch.started_at.to_date < Date.today new else last_batch end end |
.last_batch ⇒ Object
Returns the last batch run, or nil if this is the first batch.
33 34 35 |
# File 'lib/chicago/etl/batch.rb', line 33 def last_batch order(:started_at).last end |
Instance Method Details
#after_create ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
136 137 138 |
# File 'lib/chicago/etl/batch.rb', line 136 def after_create FileUtils.mkdir_p(dir, :mode => 0777) end |
#dir ⇒ Object
Returns the directory files & batch logs will be written to.
94 95 96 |
# File 'lib/chicago/etl/batch.rb', line 94 def dir @dir ||= File.join(Chicago.project_root, "tmp", "batches", id.to_s) end |
#error ⇒ Object
Sets this batch to the Error state.
116 117 118 |
# File 'lib/chicago/etl/batch.rb', line 116 def error update(:state => "Error") end |
#extract(task_name, &block) ⇒ Object
Use perform_task instead
Deprecated.
82 83 84 |
# File 'lib/chicago/etl/batch.rb', line 82 def extract(task_name, &block) perform_task(:extract, task_name, &block) end |
#extract_from ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/chicago/etl/batch.rb', line 64 def extract_from return if reextracting? value = self.class.dataset. where(:state => "Finished"). where {|r| r.id < id }. select(:max.sql_function(:extracted_to).as(:extracted_to)). single_value if value && value.to_date == Date.today (value.to_date - 1).to_time else value end end |
#finish ⇒ Object
Finishes this batch, and sets the finished_at timestamp.
111 112 113 |
# File 'lib/chicago/etl/batch.rb', line 111 def finish update(:state => "Finished", :finished_at => Time.now) end |
#finished? ⇒ Boolean
Returns true if this batch is finished.
121 122 123 |
# File 'lib/chicago/etl/batch.rb', line 121 def finished? state == "Finished" end |
#in_error? ⇒ Boolean
Returns true if in the error state
126 127 128 |
# File 'lib/chicago/etl/batch.rb', line 126 def in_error? state == "Error" end |
#load(task_name, &block) ⇒ Object
Use perform_task instead
Deprecated.
41 42 43 |
# File 'lib/chicago/etl/batch.rb', line 41 def load(task_name, &block) perform_task(:load, task_name, &block) end |
#log ⇒ Object
Returns the logger for this batch
131 132 133 |
# File 'lib/chicago/etl/batch.rb', line 131 def log @log ||= Logger.new(File.join(dir, "log")) end |
#perform_task(stage, task_name, &block) ⇒ Object
Performs a named task if it hasn’t already run successfully in this batch.
88 89 90 91 |
# File 'lib/chicago/etl/batch.rb', line 88 def perform_task(stage, task_name, &block) task = find_or_create_task_invocation(stage, task_name) task.perform(&block) unless task.finished? end |
#reextract ⇒ Object
Marks this batch for re-extraction.
53 54 55 56 |
# File 'lib/chicago/etl/batch.rb', line 53 def reextract @reextract = true self end |
#reextracting? ⇒ Boolean
Returns true when this batch should re-extract - i.e. load records without regard to creation/update times.
60 61 62 |
# File 'lib/chicago/etl/batch.rb', line 60 def reextracting? !!@reextract end |
#start(extract_to = nil) ⇒ Object
Starts this batch.
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/chicago/etl/batch.rb', line 99 def start(extract_to=nil) self.extracted_to = extract_to || Date.today save if state == "Started" log.info "Started ETL batch #{id}." else log.info "Resumed ETL batch #{id}." end self end |
#transform(task_name, &block) ⇒ Object
Use perform_task instead
Deprecated.
48 49 50 |
# File 'lib/chicago/etl/batch.rb', line 48 def transform(task_name, &block) perform_task(:extract, task_name, &block) end |