Class: Wukong::Source::SourceDriver
- Inherits:
-
Local::StdioDriver
- Object
- EM::P::LineAndTextProtocol
- Local::StdioDriver
- Wukong::Source::SourceDriver
- Includes:
- Logging
- Defined in:
- lib/wukong/source/source_driver.rb
Overview
A driver which works just like the Wukong::Local::StdioDriver
except it ignores input from STDIN
and instead generates its
own input records according to some periodic schedule. Each
consecutive record produced will be an incrementing positive
integer (as a string), starting with '1'.
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
The number of records after which a
Processor#finalize
will be called. -
#index ⇒ Object
The index of the record.
Attributes included from DriverMethods
Class Method Summary collapse
-
.start(label, settings = {}) ⇒ Object
Starts periodically feeding the processor or dataflow given by
label
using the givensettings
.
Instance Method Summary collapse
-
#create_event ⇒ Object
Creates a new event using the following steps:.
-
#post_init ⇒ Object
Sets the initial value of
index
to 1 and sets the batch size (only if it's positive). -
#process(record) ⇒ Object
Outputs a
record
from the dataflow or processor toSTDOUT
.
Methods included from Logging
Methods inherited from Local::StdioDriver
add_signal_traps, #initialize, #receive_line, #setup, #unbind
Methods included from DriverMethods
#construct_dataflow, #finalize, #finalize_and_stop_dataflow, #finalize_dataflow, #send_through_dataflow, #setup, #setup_dataflow, #stop
Constructor Details
This class inherits a constructor from Wukong::Local::StdioDriver
Instance Attribute Details
#batch_size ⇒ Object
The number of records after which a Processor#finalize
will
be called.
18 19 20 |
# File 'lib/wukong/source/source_driver.rb', line 18 def batch_size @batch_size end |
#index ⇒ Object
The index of the record.
14 15 16 |
# File 'lib/wukong/source/source_driver.rb', line 14 def index @index end |
Class Method Details
.start(label, settings = {}) ⇒ Object
Starts periodically feeding the processor or dataflow given by
label
using the given settings
.
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/wukong/source/source_driver.rb', line 33 def self.start(label, settings={}) driver = new(:foobar, label, settings) # i don't think the 1st argument matters here... driver.post_init period = case when settings[:period] then settings[:period] when settings[:per_sec] then (1.0 / settings[:per_sec]) rescue 1.0 else 1.0 end driver.create_event EventMachine::PeriodicTimer.new(period) { driver.create_event } end |
Instance Method Details
#create_event ⇒ Object
Creates a new event using the following steps:
- Feeds a record with the existing
index
to the dataflow. - Increments the
index
. - Finalizes the dataflow if the number of records is a
multiple of the
batch_size
.
54 55 56 57 58 |
# File 'lib/wukong/source/source_driver.rb', line 54 def create_event receive_line(index.to_s) self.index += 1 finalize_dataflow if self.batch_size && (self.index % self.batch_size) == 0 end |
#post_init ⇒ Object
Sets the initial value of index
to 1 and sets the batch size
(only if it's positive).
22 23 24 25 26 |
# File 'lib/wukong/source/source_driver.rb', line 22 def post_init super() self.index = 1 self.batch_size = settings[:batch_size].to_i if settings[:batch_size] && settings[:batch_size].to_i > 0 end |
#process(record) ⇒ Object
Outputs a record
from the dataflow or processor to STDOUT
.
STDOUT
will automatically be flushed to force output to
prevent the feeling of "no output" when the looping period is
long.
67 68 69 70 |
# File 'lib/wukong/source/source_driver.rb', line 67 def process record $stdout.puts record $stdout.flush end |