Class: Wukong::Source::SourceDriver

Inherits:
Local::StdioDriver show all
Includes:
Logging
Defined in:
lib/wukong/source/source_driver.rb

Overview

A driver which works just like the Wukong::Local::StdioDriver except it ignores input from STDIN and instead generates its own input records according to some periodic schedule. Each consecutive record produced will be an incrementing positive integer (as a string), starting with '1'.

Instance Attribute Summary collapse

Attributes included from DriverMethods

#dataflow, #label, #settings

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

included

Methods inherited from Local::StdioDriver

add_signal_traps, #initialize, #receive_line, #setup, #unbind

Methods included from DriverMethods

#construct_dataflow, #finalize, #finalize_and_stop_dataflow, #finalize_dataflow, #send_through_dataflow, #setup, #setup_dataflow, #stop

Constructor Details

This class inherits a constructor from Wukong::Local::StdioDriver

Instance Attribute Details

#batch_sizeObject

The number of records after which a Processor#finalize will be called.



18
19
20
# File 'lib/wukong/source/source_driver.rb', line 18

def batch_size
  @batch_size
end

#indexObject

The index of the record.



14
15
16
# File 'lib/wukong/source/source_driver.rb', line 14

def index
  @index
end

Class Method Details

.start(label, settings = {}) ⇒ Object

Starts periodically feeding the processor or dataflow given by label using the given settings.

Parameters:

  • label (String, Symbol)
  • settings (Configliere::Param, Hash) (defaults to: {})


33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/wukong/source/source_driver.rb', line 33

def self.start(label, settings={})
  driver = new(:foobar, label, settings) # i don't think the 1st argument matters here...
  driver.post_init

  period = case
  when settings[:period]  then settings[:period]
  when settings[:per_sec] then (1.0 / settings[:per_sec]) rescue 1.0
  else 1.0
  end
  driver.create_event
  EventMachine::PeriodicTimer.new(period) { driver.create_event }
end

Instance Method Details

#create_eventObject

Creates a new event using the following steps:

  1. Feeds a record with the existing index to the dataflow.
  2. Increments the index.
  3. Finalizes the dataflow if the number of records is a multiple of the batch_size.

See Also:



54
55
56
57
58
# File 'lib/wukong/source/source_driver.rb', line 54

def create_event
  receive_line(index.to_s)
  self.index += 1
  finalize_dataflow if self.batch_size && (self.index % self.batch_size) == 0
end

#post_initObject

Sets the initial value of index to 1 and sets the batch size (only if it's positive).



22
23
24
25
26
# File 'lib/wukong/source/source_driver.rb', line 22

def post_init
  super()
  self.index = 1
  self.batch_size = settings[:batch_size].to_i if settings[:batch_size] && settings[:batch_size].to_i > 0
end

#process(record) ⇒ Object

Outputs a record from the dataflow or processor to STDOUT.

STDOUT will automatically be flushed to force output to prevent the feeling of "no output" when the looping period is long.

Parameters:

  • record (Object)

    the record yielded by the processor or the terminal node(s) of the dataflow



67
68
69
70
# File 'lib/wukong/source/source_driver.rb', line 67

def process record
  $stdout.puts record
  $stdout.flush
end