Module: Kiba::Runner

Extended by:
Runner
Included in:
Runner, StreamingRunner
Defined in:
lib/kiba/runner.rb

Defined Under Namespace

Classes: AliasingProc

Instance Method Summary collapse

Instance Method Details

#close_destinations(destinations) ⇒ Object



34
35
36
37
38
# File 'lib/kiba/runner.rb', line 34

def close_destinations(destinations)
  destinations
  .find_all { |d| d.respond_to?(:close) }
  .each(&:close)
end

#process_rows(sources, transforms, destinations) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/kiba/runner.rb', line 40

def process_rows(sources, transforms, destinations)
  sources.each do |source|
    source.each do |row|
      transforms.each do |transform|
        row = transform.process(row)
        break unless row
      end
      next unless row
      destinations.each do |destination|
        destination.write(row)
      end
    end
  end
end

#run(control) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/kiba/runner.rb', line 10

def run(control)
  # TODO: add a dry-run (not instantiating mode) to_instances call
  # that will validate the job definition from a syntax pov before
  # going any further. This could be shared with the parser.
  run_pre_processes(control)
  process_rows(
    to_instances(control.sources),
    to_instances(control.transforms, true),
    destinations = to_instances(control.destinations)
  )
  close_destinations(destinations)
  # TODO: when I add post processes as class, I'll have to add a test to
  # make sure instantiation occurs after the main processing is done (#16)
  run_post_processes(control)
end

#run_post_processes(control) ⇒ Object



30
31
32
# File 'lib/kiba/runner.rb', line 30

def run_post_processes(control)
  to_instances(control.post_processes, true, false).each(&:call)
end

#run_pre_processes(control) ⇒ Object



26
27
28
# File 'lib/kiba/runner.rb', line 26

def run_pre_processes(control)
  to_instances(control.pre_processes, true, false).each(&:call)
end

#to_instance(klass, args, block, allow_block, allow_class) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/kiba/runner.rb', line 65

def to_instance(klass, args, block, allow_block, allow_class)
  if klass
    fail 'Class form is not allowed here' unless allow_class
    klass.new(*args)
  elsif block
    fail 'Block form is not allowed here' unless allow_block
    AliasingProc.new(&block)
  else
    # TODO: support block passing to a class form definition?
    fail 'Class and block form cannot be used together at the moment'
  end
end

#to_instances(definitions, allow_block = false, allow_class = true) ⇒ Object

not using keyword args because JRuby defaults to 1.9 syntax currently



56
57
58
59
60
61
62
63
# File 'lib/kiba/runner.rb', line 56

def to_instances(definitions, allow_block = false, allow_class = true)
  definitions.map do |definition|
    to_instance(
      *definition.values_at(:klass, :args, :block),
      allow_block, allow_class
    )
  end
end