Class: Wukong::Load::SourceDriver

Inherits:
Wukong::Local::StdioDriver
  • Object
show all
Includes:
Logging
Defined in:
lib/wukong-load/source_driver.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



6
7
8
# File 'lib/wukong-load/source_driver.rb', line 6

def batch_size
  @batch_size
end

#indexObject

Returns the value of attribute index.



6
7
8
# File 'lib/wukong-load/source_driver.rb', line 6

def index
  @index
end

Class Method Details

.start(label, settings = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/wukong-load/source_driver.rb', line 14

def self.start(label, settings={})
  driver = new(:foobar, label, settings)
  driver.post_init

  period = case
  when settings[:period]  then settings[:period]
  when settings[:per_sec] then (1.0 / settings[:per_sec]) rescue 1.0
  else 1.0
  end
  driver.create_event
  EventMachine::PeriodicTimer.new(period) { driver.create_event }
end

Instance Method Details

#create_eventObject



27
28
29
30
31
# File 'lib/wukong-load/source_driver.rb', line 27

def create_event
  receive_line(index.to_s)
  self.index += 1
  finalize_dataflow if self.batch_size && (self.index % self.batch_size) == 0
end

#post_initObject



8
9
10
11
12
# File 'lib/wukong-load/source_driver.rb', line 8

def post_init
  super()
  self.index = 1
  self.batch_size = settings[:batch_size].to_i if settings[:batch_size] && settings[:batch_size].to_i > 0
end

#process(record) ⇒ Object

:nodoc:

Not sure why I have to add the call to $stdout.flush at the end of this method. Supposedly $stdout.sync is called during the #setup method in StdoutProcessor in wukong/widget/processors. Doesn't that do this?



39
40
41
42
# File 'lib/wukong-load/source_driver.rb', line 39

def process record
  $stdout.puts record
  $stdout.flush
end