Module: Kiba::StreamingRunner
Instance Method Summary
collapse
Methods included from Runner
#close_destinations, #run, #run_post_processes, #run_pre_processes, #to_instance, #to_instances
Instance Method Details
#process_rows(sources, transforms, destinations) ⇒ Object
25
26
27
28
29
30
31
|
# File 'lib/kiba/streaming_runner.rb', line 25
def process_rows(sources, transforms, destinations)
stream = source_stream(sources)
recurser = lambda { |s,t| transform_stream(s, t) }
transforms.inject(stream, &recurser).each do |r|
destinations.each { |d| d.write(r) }
end
end
|
#source_stream(sources) ⇒ Object
17
18
19
20
21
22
23
|
# File 'lib/kiba/streaming_runner.rb', line 17
def source_stream(sources)
Enumerator.new do |y|
sources.each do |source|
source.each { |r| y << r }
end
end
end
|
6
7
8
9
10
11
12
13
14
15
|
# File 'lib/kiba/streaming_runner.rb', line 6
def transform_stream(stream, t)
Enumerator.new do |y|
stream.each do |input_row|
returned_row = t.process(input_row) do |yielded_row|
y << yielded_row
end
y << returned_row if returned_row
end
end
end
|