Class: Wukong::Wiring

Inherits:
Object
  • Object
show all
Defined in:
lib/wukong/driver/wiring.rb

Overview

Provides a very Ruby-minded way of walking a dataflow connected to a driver.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(driver, dataflow) ⇒ Wiring

Construct a new Wiring for the given driver and dataflow.

Parameters:



18
19
20
21
# File 'lib/wukong/driver/wiring.rb', line 18

def initialize(driver, dataflow)
  @driver    = driver
  @dataflow  = dataflow
end

Instance Attribute Details

#dataflowObject

The dataflow being wired.



12
13
14
# File 'lib/wukong/driver/wiring.rb', line 12

def dataflow
  @dataflow
end

#driverObject

The driver instance that likely calls the #start_with method and provides a #process method to be called by this wiring.



9
10
11
# File 'lib/wukong/driver/wiring.rb', line 9

def driver
  @driver
end

Instance Method Details

#advance(stage) ⇒ Proc

Return a proc (the output of #start_with) which will process records through the stages that are ahead of the given stage.

Parameters:

  • stage (Wukong::Stage)

Returns:

  • (Proc)

See Also:



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/wukong/driver/wiring.rb', line 40

def advance(stage)
  # This is where the tree of procs will terminate, but only after
  # having passed all output records through the driver -- the
  # last "stage".
  return start_with() if stage.nil? || stage == driver

  # Otherwise we're still in the middle of the tree...
  descendents = dataflow.descendents(stage)
  if descendents.empty?
    # No descendents it means we've reached a leaf of the tree so
    # we'll run records through the driver to generate output.
    start_with(driver)
  else
    # Otherwise continue down the tree of procs...
    start_with(*descendents)
  end
end

#start_with(*stages) ⇒ Proc

Return a proc which, if called with a record, will process that record through each of the given stages as well as through the rest of the dataflow ahead of them.

Parameters:

  • stages (Array<Wukong::Stage>)

Returns:

  • (Proc)


29
30
31
# File 'lib/wukong/driver/wiring.rb', line 29

def start_with(*stages)
  to_proc.curry.call(stages)
end

#to_procObject

:nodoc:



59
60
61
62
63
64
65
66
# File 'lib/wukong/driver/wiring.rb', line 59

def to_proc
  return @wiring if @wiring
  @wiring = Proc.new do |stages, record|
    stages.each do |stage|
      stage.process(record, &advance(stage)) if stage
    end
  end
end