Class: Chicago::ETL::Batch

Inherits:
Sequel::Model
  • Object
show all
Defined in:
lib/chicago/etl/batch.rb

Overview

A particular “run” of the ETL process.

All ETL tasks should be executed in the context of a Batch.

A batch creates a temporary directory under tmp/batches/:id where it stores various logs and extract files.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.instanceObject

Returns the Batch that should be used for the ETL process.

A new batch is returned, unless the previous batch did not finish successfully.

This should be used in preference to new or create.



24
25
26
27
28
29
30
# File 'lib/chicago/etl/batch.rb', line 24

def instance
  if last_batch.nil? || last_batch.finished? || last_batch.started_at.to_date < Date.today
    new
  else 
    last_batch
  end
end

.last_batchObject

Returns the last batch run, or nil if this is the first batch.



33
34
35
# File 'lib/chicago/etl/batch.rb', line 33

def last_batch
  order(:started_at).last
end

Instance Method Details

#after_createObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



136
137
138
# File 'lib/chicago/etl/batch.rb', line 136

def after_create
  FileUtils.mkdir_p(dir, :mode => 0777)
end

#dirObject

Returns the directory files & batch logs will be written to.



94
95
96
# File 'lib/chicago/etl/batch.rb', line 94

def dir
  @dir ||= File.join(Chicago.project_root, "tmp", "batches", id.to_s)
end

#errorObject

Sets this batch to the Error state.



116
117
118
# File 'lib/chicago/etl/batch.rb', line 116

def error
  update(:state => "Error")
end

#extract(task_name, &block) ⇒ Object

Deprecated.

Use perform_task instead

Deprecated.



82
83
84
# File 'lib/chicago/etl/batch.rb', line 82

def extract(task_name, &block)
  perform_task(:extract, task_name, &block)
end

#extract_fromObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/chicago/etl/batch.rb', line 64

def extract_from
  return if reextracting?
  value = self.class.dataset.
    where(:state => "Finished").
    where {|r| r.id < id }.
    select(:max.sql_function(:extracted_to).as(:extracted_to)).
    single_value

  if value && value.to_date == Date.today
    (value.to_date - 1).to_time
  else
    value
  end
end

#finishObject

Finishes this batch, and sets the finished_at timestamp.



111
112
113
# File 'lib/chicago/etl/batch.rb', line 111

def finish
  update(:state => "Finished", :finished_at => Time.now)
end

#finished?Boolean

Returns true if this batch is finished.

Returns:

  • (Boolean)


121
122
123
# File 'lib/chicago/etl/batch.rb', line 121

def finished?
  state == "Finished"
end

#in_error?Boolean

Returns true if in the error state

Returns:

  • (Boolean)


126
127
128
# File 'lib/chicago/etl/batch.rb', line 126

def in_error?
  state == "Error"
end

#load(task_name, &block) ⇒ Object

Deprecated.

Use perform_task instead

Deprecated.



41
42
43
# File 'lib/chicago/etl/batch.rb', line 41

def load(task_name, &block)
  perform_task(:load, task_name, &block)
end

#logObject

Returns the logger for this batch



131
132
133
# File 'lib/chicago/etl/batch.rb', line 131

def log
  @log ||= Logger.new(File.join(dir, "log"))
end

#perform_task(stage, task_name, &block) ⇒ Object

Performs a named task if it hasn’t already run successfully in this batch.



88
89
90
91
# File 'lib/chicago/etl/batch.rb', line 88

def perform_task(stage, task_name, &block)
  task = find_or_create_task_invocation(stage, task_name)
  task.perform(&block) unless task.finished?
end

#reextractObject

Marks this batch for re-extraction.



53
54
55
56
# File 'lib/chicago/etl/batch.rb', line 53

def reextract
  @reextract = true
  self
end

#reextracting?Boolean

Returns true when this batch should re-extract - i.e. load records without regard to creation/update times.

Returns:

  • (Boolean)


60
61
62
# File 'lib/chicago/etl/batch.rb', line 60

def reextracting?
  !!@reextract
end

#start(extract_to = nil) ⇒ Object

Starts this batch.



99
100
101
102
103
104
105
106
107
108
# File 'lib/chicago/etl/batch.rb', line 99

def start(extract_to=nil)
  self.extracted_to = extract_to || Date.today
  save
  if state == "Started"
    log.info "Started ETL batch #{id}."
  else
    log.info "Resumed ETL batch #{id}."
  end
  self
end

#transform(task_name, &block) ⇒ Object

Deprecated.

Use perform_task instead

Deprecated.



48
49
50
# File 'lib/chicago/etl/batch.rb', line 48

def transform(task_name, &block)
  perform_task(:extract, task_name, &block)
end