Class: Rodimus::Step

Inherits:
Object
  • Object
show all
Includes:
Observable, Observing, RuntimeLogging
Defined in:
lib/rodimus/step.rb

Instance Attribute Summary collapse

Attributes included from RuntimeLogging

#start_time

Instance Method Summary collapse

Methods included from RuntimeLogging

#after_run_record_time, #before_run_record_time

Methods included from Observing

#on_notify

Methods included from Observable

#notify, #observers

Constructor Details

#initializeStep

Returns a new instance of Step.



18
19
20
21
# File 'lib/rodimus/step.rb', line 18

def initialize
  observers << self
  observers << Benchmark.new if Rodimus.configuration.benchmarking
end

Instance Attribute Details

#incomingObject

The incoming data stream. Can be anything that quacks like an IO



9
10
11
# File 'lib/rodimus/step.rb', line 9

def incoming
  @incoming
end

#outgoingObject

The outgoing data stream. Can be anything that quacks like an IO



12
13
14
# File 'lib/rodimus/step.rb', line 12

def outgoing
  @outgoing
end

#shared_dataObject

Shared user-data accessible across all running transformation steps. This is initialized by the Transformation when the step begins to run.



16
17
18
# File 'lib/rodimus/step.rb', line 16

def shared_data
  @shared_data
end

Instance Method Details

#close_descriptorsObject



23
24
25
26
27
# File 'lib/rodimus/step.rb', line 23

def close_descriptors
  [incoming, outgoing].reject(&:nil?).each do |descriptor|
    descriptor.close if descriptor.respond_to?(:close)
  end
end

#handle_output(transformed_row) ⇒ Object

Override this for custom output handling functionality per-row.



30
31
32
# File 'lib/rodimus/step.rb', line 30

def handle_output(transformed_row)
  outgoing.puts(transformed_row)
end

#process_row(row) ⇒ Object

Override this for custom transformation functionality



35
36
37
# File 'lib/rodimus/step.rb', line 35

def process_row(row)
  row.to_s
end

#runObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/rodimus/step.rb', line 39

def run
  notify(self, :before_run)
  @row_count = 1
  incoming.each do |row|
    notify(self, :before_row)
    transformed_row = process_row(row)
    handle_output(transformed_row)
    Rodimus.logger.info(self) { "#{@row_count} rows processed" } if @row_count % 50000 == 0
    @row_count += 1
    notify(self, :after_row)
  end
  notify(self, :after_run)
ensure
  close_descriptors
end

#to_sObject



55
56
57
# File 'lib/rodimus/step.rb', line 55

def to_s
  "#{self.class} connected to input: #{incoming || 'nil'} and output: #{outgoing || 'nil'}"
end