Class: Rodimus::Step
- Inherits:
-
Object
- Object
- Rodimus::Step
- Includes:
- Observable, Observing, RuntimeLogging
- Defined in:
- lib/rodimus/step.rb
Instance Attribute Summary collapse
-
#incoming ⇒ Object
The incoming data stream.
-
#outgoing ⇒ Object
The outgoing data stream.
-
#shared_data ⇒ Object
Shared user-data accessible across all running transformation steps.
Attributes included from RuntimeLogging
Instance Method Summary collapse
- #close_descriptors ⇒ Object
-
#handle_output(transformed_row) ⇒ Object
Override this for custom output handling functionality per-row.
-
#initialize ⇒ Step
constructor
A new instance of Step.
-
#process_row(row) ⇒ Object
Override this for custom transformation functionality.
- #run ⇒ Object
- #to_s ⇒ Object
Methods included from RuntimeLogging
#after_run_record_time, #before_run_record_time
Methods included from Observing
Methods included from Observable
Constructor Details
#initialize ⇒ Step
Returns a new instance of Step.
18 19 20 21 |
# File 'lib/rodimus/step.rb', line 18 def initialize observers << self observers << Benchmark.new if Rodimus.configuration.benchmarking end |
Instance Attribute Details
#incoming ⇒ Object
The incoming data stream. Can be anything that quacks like an IO
9 10 11 |
# File 'lib/rodimus/step.rb', line 9 def incoming @incoming end |
#outgoing ⇒ Object
The outgoing data stream. Can be anything that quacks like an IO
12 13 14 |
# File 'lib/rodimus/step.rb', line 12 def outgoing @outgoing end |
#shared_data ⇒ Object
Shared user-data accessible across all running transformation steps. This is initialized by the Transformation when the step begins to run.
16 17 18 |
# File 'lib/rodimus/step.rb', line 16 def shared_data @shared_data end |
Instance Method Details
#close_descriptors ⇒ Object
23 24 25 26 27 |
# File 'lib/rodimus/step.rb', line 23 def close_descriptors [incoming, outgoing].reject(&:nil?).each do |descriptor| descriptor.close if descriptor.respond_to?(:close) end end |
#handle_output(transformed_row) ⇒ Object
Override this for custom output handling functionality per-row.
30 31 32 |
# File 'lib/rodimus/step.rb', line 30 def handle_output(transformed_row) outgoing.puts(transformed_row) end |
#process_row(row) ⇒ Object
Override this for custom transformation functionality
35 36 37 |
# File 'lib/rodimus/step.rb', line 35 def process_row(row) row.to_s end |
#run ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/rodimus/step.rb', line 39 def run notify(self, :before_run) @row_count = 1 incoming.each do |row| notify(self, :before_row) transformed_row = process_row(row) handle_output(transformed_row) Rodimus.logger.info(self) { "#{@row_count} rows processed" } if @row_count % 50000 == 0 @row_count += 1 notify(self, :after_row) end notify(self, :after_run) ensure close_descriptors end |
#to_s ⇒ Object
55 56 57 |
# File 'lib/rodimus/step.rb', line 55 def to_s "#{self.class} connected to input: #{incoming || 'nil'} and output: #{outgoing || 'nil'}" end |