Module: Kiba::StreamingRunner
Defined Under Namespace
Classes: AliasingProc
Instance Method Summary collapse
- #close_destinations(destinations) ⇒ Object
- #process_rows(sources, transforms, destinations) ⇒ Object
- #run(control) ⇒ Object
- #run_post_processes(control) ⇒ Object
- #run_pre_processes(control) ⇒ Object
- #source_stream(sources) ⇒ Object
- #to_instance(klass, args, block, allow_block, allow_class) ⇒ Object
- #to_instances(definitions, allow_block = false, allow_class = true) ⇒ Object
- #transform_stream(stream, t) ⇒ Object
Instance Method Details
#close_destinations(destinations) ⇒ Object
29 30 31 32 33 |
# File 'lib/kiba/streaming_runner.rb', line 29 def close_destinations(destinations) destinations .find_all { |d| d.respond_to?(:close) } .each(&:close) end |
#process_rows(sources, transforms, destinations) ⇒ Object
59 60 61 62 63 64 65 |
# File 'lib/kiba/streaming_runner.rb', line 59 def process_rows(sources, transforms, destinations) stream = source_stream(sources) recurser = lambda { |s, t| transform_stream(s, t) } transforms.inject(stream, &recurser).each do |r| destinations.each { |d| d.write(r) } end end |
#run(control) ⇒ Object
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/kiba/streaming_runner.rb', line 10 def run(control) run_pre_processes(control) process_rows( to_instances(control.sources), to_instances(control.transforms, true), destinations = to_instances(control.destinations) ) close_destinations(destinations) run_post_processes(control) end |
#run_post_processes(control) ⇒ Object
25 26 27 |
# File 'lib/kiba/streaming_runner.rb', line 25 def run_post_processes(control) to_instances(control.post_processes, true, false).each(&:call) end |
#run_pre_processes(control) ⇒ Object
21 22 23 |
# File 'lib/kiba/streaming_runner.rb', line 21 def run_pre_processes(control) to_instances(control.pre_processes, true, false).each(&:call) end |
#source_stream(sources) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/kiba/streaming_runner.rb', line 51 def source_stream(sources) Enumerator.new do |y| sources.each do |source| source.each { |r| y << r } end end end |
#to_instance(klass, args, block, allow_block, allow_class) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/kiba/streaming_runner.rb', line 76 def to_instance(klass, args, block, allow_block, allow_class) if klass && block fail "Class and block form cannot be used together at the moment" elsif klass fail "Class form is not allowed here" unless allow_class klass.new(*args) elsif block fail "Block form is not allowed here" unless allow_block AliasingProc.new(&block) else fail "Nil parameters not allowed here" end end |
#to_instances(definitions, allow_block = false, allow_class = true) ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/kiba/streaming_runner.rb', line 67 def to_instances(definitions, allow_block = false, allow_class = true) definitions.map do |definition| to_instance( *definition.values_at(:klass, :args, :block), allow_block, allow_class ) end end |
#transform_stream(stream, t) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/kiba/streaming_runner.rb', line 35 def transform_stream(stream, t) Enumerator.new do |y| stream.each do |input_row| returned_row = t.process(input_row) do |yielded_row| y << yielded_row end y << returned_row if returned_row end if t.respond_to?(:close) t.close do |close_row| y << close_row end end end end |