Module: Wukong::DriverMethods

Included in:
EventMachineDriver, Local::StdioDriver, SpecHelpers::UnitTestDriver
Defined in:
lib/wukong/driver.rb

Overview

A Driver is a class including the DriverMethods module which connects a Dataflow or Processor to the external world of inputs and outputs.

The MinimalDriver#send_through_dataflow method can be called on an instance of MinimalDriver with any input record.

This record will be passed through the dataflow, starting from its root, and each record yielded at the leaves of the dataflow will be passed to the driver's #process method.

The #process method of an implementing driver should not yield, unlike the process method of a Processor class. Instead, it should treat its argument as an output of the dataflow and do something appropriate to the driver (write to file, database, terminal, &c.).

Drivers are also responsible for implementing the lifecycle of processors and dataflows they drive. A more complete version of the above driver class would:

  • call the #setup_dataflow method when ready to trigger the Processor#setup method on each processor in the dataflow

  • call the #finalize_dataflow method when indicating that the dataflow should consider a batch of records complete

  • call the #finalize_and_stop_dataflow method to indicate the last batch of records and to trigger the Processor#stop method on each processor in the dataflow

Driver instances are started by Runners which should delegate to the start method driver class itself.

Examples:

Minimal Driver class


class MinimalDriver
  include Wukong::DriverMethods
  def initialize(label, settings)
    construct_dataflow(label, settings)
  end
  def process record
    puts record
  end
end

See Also:

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#dataflowObject

Returns the value of attribute dataflow.



57
58
59
# File 'lib/wukong/driver.rb', line 57

def dataflow
  @dataflow
end

#labelObject

Returns the value of attribute label.



55
56
57
# File 'lib/wukong/driver.rb', line 55

def label
  @label
end

#settingsObject

Returns the value of attribute settings.



56
57
58
# File 'lib/wukong/driver.rb', line 56

def settings
  @settings
end

Instance Method Details

#construct_dataflow(label, settings = {}) ⇒ Object

Construct a dataflow from the given label and settings.

This method does not cause Processor#setup to be called on any of the processors in the dataflow. Call the #setup_dataflow method to explicitly have setup occur. This distinction is useful for drivers which themselves need to do complex initialization before letting processors in the dataflow initialize.

Parameters:

  • label (Symbol)

    the name of the dataflow (or processor) to build

  • settings (Hash) (defaults to: {})
  • settings (String) (defaults to: {})

    :to Serialize all output via the named serializer (json, tsv)

  • settings (String) (defaults to: {})

    :from Deserialize all input via the named deserializer (json, tsv)

  • settings (String) (defaults to: {})

    :as Recordize each input as instances of the given class

See Also:



84
85
86
87
88
89
90
91
# File 'lib/wukong/driver.rb', line 84

def construct_dataflow(label, settings={})
  self.label    = label
  self.settings = settings
  prepend(:recordize)                       if settings[:as]
  prepend("from_#{settings[:from]}".to_sym) if settings[:from]
  append("to_#{settings[:to]}".to_sym)      if settings[:to]
  build_dataflow
end

#finalizeObject

Perform finalization code for this driver. Runs after #setup and before #stop.



116
117
# File 'lib/wukong/driver.rb', line 116

def finalize
end

#finalize_and_stop_dataflowObject

Works similar to #finalize_dataflow but calls Processor#stop after calling Processor#finalize on each processor.



138
139
140
141
142
143
144
145
# File 'lib/wukong/driver.rb', line 138

def finalize_and_stop_dataflow
  finalize
  dataflow.each_stage do |stage|
    stage.finalize(&wiring.advance(stage))
    stage.stop
  end
  stop
end

#finalize_dataflowObject

Indicate a full batch of records has already been sent through and any batch-oriented or accumulative operations should trigger (e.g. - counting).

Walks the dataflow calling Processor#finalize on each processor.

On the last batch, the #finalize_and_stop_dataflow method should be called instead.



129
130
131
132
133
134
# File 'lib/wukong/driver.rb', line 129

def finalize_dataflow
  finalize
  dataflow.each_stage do |stage|
    stage.finalize(&wiring.advance(stage))
  end
end

#process(output_record) ⇒ Object

Classes including DriverMethods should override this method with some way of handling the output_record that is appropriate for the driver.

Parameters:

  • output_record (Object)

Raises:

  • (NotImplementedError)


64
65
66
# File 'lib/wukong/driver.rb', line 64

def process output_record
  raise NotImplementedError.new("Define the #{self.class}#process method to handle output records from the dataflow")
end

#send_through_dataflow(record) ⇒ Object

Send the given record through the dataflow.

Parameters:

  • record (Object)


110
111
112
# File 'lib/wukong/driver.rb', line 110

def send_through_dataflow(record)
  wiring.start_with(dataflow.root).call(record)
end

#setupObject

Set up this driver. Called before setting up any of the dataflow stages.



95
96
# File 'lib/wukong/driver.rb', line 95

def setup
end

#setup_dataflowObject

Walks the dataflow and calls Processor#setup on each of the processors.



100
101
102
103
104
105
# File 'lib/wukong/driver.rb', line 100

def setup_dataflow
  setup
  dataflow.each_stage do |stage|
    stage.setup
  end
end

#stopObject

Perform shutdown code for this driver. Called after #finalize and after all stages have been finalized and stopped.



149
150
# File 'lib/wukong/driver.rb', line 149

def stop
end