Class: Wukong::Storm::BoltDriver

Inherits:
Local::StdioDriver
  • Object
show all
Includes:
Logging
Defined in:
lib/wukong-storm/bolt_driver.rb

Overview

Modifies the behavior of Wukong::Local::StdioDriver by appending a batch delimiter after each set of output records, including when there are 0 output records or if an error occurs.

Instance Method Summary collapse

Constructor Details

#initialize(label, settings) ⇒ BoltDriver

Override the behavior of StdioDriver by initializing an empty array of output records.



17
18
19
20
# File 'lib/wukong-storm/bolt_driver.rb', line 17

def initialize(label, settings)
  super(label, settings)
  @output = []
end

Instance Method Details

#process(record) ⇒ Object

Don't write the record to $stdout, but store it in an array of output records instead.

Parameters:

  • record (Object)

See Also:



52
53
54
# File 'lib/wukong-storm/bolt_driver.rb', line 52

def process(record)
  @output << record
end

#receive_line(line) ⇒ Object

Called by EventMachine framework after successfully reading a line from $stdin.

Relies on StdioDriver, but calls #write_output afterwards to ensure that a delimiter is also sent.

Parameters:

  • line (String)


37
38
39
40
# File 'lib/wukong-storm/bolt_driver.rb', line 37

def receive_line line
  super(line)
  write_output
end

#setupObject

Do not sync $stdout as in the StdioDriver.



23
24
# File 'lib/wukong-storm/bolt_driver.rb', line 23

def setup()
end

#write_outputObject

Writes all output records out in a single batch write with a batch delimiter appended to the end.

All output records are newline delimited within the batch.

The batch itself includes a newline character after the final batch delimiter.

$stdout is flushed after the write and accumulated outputs are cleared.

See Also:



68
69
70
71
72
73
74
75
76
77
# File 'lib/wukong-storm/bolt_driver.rb', line 68

def write_output
  @output.each do |record|
    $stdout.write(record)
    $stdout.write("\n")
  end
  $stdout.write(settings.delimiter)
  $stdout.write("\n")
  $stdout.flush
  @output.clear
end