Module: Kiba::StreamingRunner

Extended by:
StreamingRunner
Included in:
StreamingRunner
Defined in:
lib/kiba/streaming_runner.rb

Defined Under Namespace

Classes: AliasingProc

Instance Method Summary collapse

Instance Method Details

#close_destinations(destinations) ⇒ Object



29
30
31
32
33
# File 'lib/kiba/streaming_runner.rb', line 29

def close_destinations(destinations)
  destinations
    .find_all { |d| d.respond_to?(:close) }
    .each(&:close)
end

#process_rows(sources, transforms, destinations) ⇒ Object



59
60
61
62
63
64
65
# File 'lib/kiba/streaming_runner.rb', line 59

def process_rows(sources, transforms, destinations)
  stream = source_stream(sources)
  recurser = lambda { |s, t| transform_stream(s, t) }
  transforms.inject(stream, &recurser).each do |r|
    destinations.each { |d| d.write(r) }
  end
end

#run(control) ⇒ Object



10
11
12
13
14
15
16
17
18
19
# File 'lib/kiba/streaming_runner.rb', line 10

def run(control)
  run_pre_processes(control)
  process_rows(
    to_instances(control.sources),
    to_instances(control.transforms, true),
    destinations = to_instances(control.destinations)
  )
  close_destinations(destinations)
  run_post_processes(control)
end

#run_post_processes(control) ⇒ Object



25
26
27
# File 'lib/kiba/streaming_runner.rb', line 25

def run_post_processes(control)
  to_instances(control.post_processes, true, false).each(&:call)
end

#run_pre_processes(control) ⇒ Object



21
22
23
# File 'lib/kiba/streaming_runner.rb', line 21

def run_pre_processes(control)
  to_instances(control.pre_processes, true, false).each(&:call)
end

#source_stream(sources) ⇒ Object



51
52
53
54
55
56
57
# File 'lib/kiba/streaming_runner.rb', line 51

def source_stream(sources)
  Enumerator.new do |y|
    sources.each do |source|
      source.each { |r| y << r }
    end
  end
end

#to_instance(klass, args, block, allow_block, allow_class) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/kiba/streaming_runner.rb', line 76

def to_instance(klass, args, block, allow_block, allow_class)
  if klass && block
    fail "Class and block form cannot be used together at the moment"
  elsif klass
    fail "Class form is not allowed here" unless allow_class
    klass.new(*args)
  elsif block
    fail "Block form is not allowed here" unless allow_block
    AliasingProc.new(&block)
  else
    fail "Nil parameters not allowed here"
  end
end

#to_instances(definitions, allow_block = false, allow_class = true) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/kiba/streaming_runner.rb', line 67

def to_instances(definitions, allow_block = false, allow_class = true)
  definitions.map do |definition|
    to_instance(
      *definition.values_at(:klass, :args, :block),
      allow_block, allow_class
    )
  end
end

#transform_stream(stream, t) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/kiba/streaming_runner.rb', line 35

def transform_stream(stream, t)
  Enumerator.new do |y|
    stream.each do |input_row|
      returned_row = t.process(input_row) do |yielded_row|
        y << yielded_row
      end
      y << returned_row if returned_row
    end
    if t.respond_to?(:close)
      t.close do |close_row|
        y << close_row
      end
    end
  end
end