Module: Kiba::StreamingRunner

Extended by:
StreamingRunner
Includes:
Runner
Included in:
StreamingRunner
Defined in:
lib/kiba/streaming_runner.rb

Instance Method Summary collapse

Methods included from Runner

#close_destinations, #run, #run_post_processes, #run_pre_processes, #to_instance, #to_instances

Instance Method Details

#process_rows(sources, transforms, destinations) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/kiba/streaming_runner.rb', line 25

def process_rows(sources, transforms, destinations)
  stream = source_stream(sources)
  recurser = lambda { |s,t| transform_stream(s, t) }
  transforms.inject(stream, &recurser).each do |r|
    destinations.each { |d| d.write(r) }
  end
end

#source_stream(sources) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/kiba/streaming_runner.rb', line 17

def source_stream(sources)
  Enumerator.new do |y|
    sources.each do |source|
      source.each { |r| y << r }
    end
  end
end

#transform_stream(stream, t) ⇒ Object



6
7
8
9
10
11
12
13
14
15
# File 'lib/kiba/streaming_runner.rb', line 6

def transform_stream(stream, t)
  Enumerator.new do |y|
    stream.each do |input_row|
      returned_row = t.process(input_row) do |yielded_row|
        y << yielded_row
      end
      y << returned_row if returned_row
    end
  end
end