Class: Wukong::Local::TCPDriver

Inherits:
EM::P::LineAndTextProtocol
  • Object
show all
Includes:
EventMachineDriver, Wukong::Logging, Processor::BufferedProcessor
Defined in:
lib/wukong/local/tcp_driver.rb

Overview

A class for driving processors over a TCP protocol.

Instance Attribute Summary

Attributes included from DriverMethods

#dataflow, #settings

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Wukong::Logging

included

Methods included from Processor::BufferedProcessor

#process, #setup, #stop

Methods included from EventMachineDriver

included, #initialize

Methods included from DriverMethods

#add_serialization, #build_serializer, #construct_dataflow, #driver, #finalize_and_stop_dataflow, #finalize_dataflow, #lookup, #lookup_and_build, #setup_dataflow

Class Method Details

.start(label, settings = {}) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/wukong/local/tcp_driver.rb', line 11

def self.start(label, settings = {})
  host = (settings[:host] || Socket.gethostname) rescue 'localhost'
  port = (settings[:port] || 9000).to_i          rescue 9000
  EM.start_server(host, port, self, label, settings)
  log.info "Server started on #{host} on port #{port}"
  add_signal_traps
end

Instance Method Details

#flush_buffer(records) ⇒ Object



35
36
37
38
# File 'lib/wukong/local/tcp_driver.rb', line 35

def flush_buffer records
  send_data(records.join("\n") + "\n")
  records.clear
end

#post_initObject



19
20
21
22
23
# File 'lib/wukong/local/tcp_driver.rb', line 19

def post_init
  port, ip = Socket.unpack_sockaddr_in(get_peername)
  log.info "Connected to #{ip} on #{port}"
  setup_dataflow
end

#receive_line(line) ⇒ Object



25
26
27
28
29
30
31
32
33
# File 'lib/wukong/local/tcp_driver.rb', line 25

def receive_line line
  @buffer = []      
  operation = proc { driver.send_through_dataflow(line) }
  callback  = proc { flush_buffer @buffer }
  EM.defer(operation, callback)
rescue => e
  EM.stop
  raise Wukong::Error.new(e)
end

#unbindObject



40
41
42
43
# File 'lib/wukong/local/tcp_driver.rb', line 40

def unbind
  finalize_and_stop_dataflow
  EM.stop
end