Class: Wukong::Local::TCPDriver
Overview
A class for driving processors over a TCP protocol.
Instance Attribute Summary
#dataflow, #settings
Class Method Summary
collapse
Instance Method Summary
collapse
included
#process, #setup, #stop
included, #initialize
#add_serialization, #build_serializer, #construct_dataflow, #driver, #finalize_and_stop_dataflow, #finalize_dataflow, #lookup, #lookup_and_build, #setup_dataflow
Class Method Details
.start(label, settings = {}) ⇒ Object
11
12
13
14
15
16
17
|
# File 'lib/wukong/local/tcp_driver.rb', line 11
def self.start(label, settings = {})
host = (settings[:host] || Socket.gethostname) rescue 'localhost'
port = (settings[:port] || 9000).to_i rescue 9000
EM.start_server(host, port, self, label, settings)
log.info "Server started on #{host} on port #{port}"
add_signal_traps
end
|
Instance Method Details
#flush_buffer(records) ⇒ Object
35
36
37
38
|
# File 'lib/wukong/local/tcp_driver.rb', line 35
def flush_buffer records
send_data(records.join("\n") + "\n")
records.clear
end
|
#post_init ⇒ Object
19
20
21
22
23
|
# File 'lib/wukong/local/tcp_driver.rb', line 19
def post_init
port, ip = Socket.unpack_sockaddr_in(get_peername)
log.info "Connected to #{ip} on #{port}"
setup_dataflow
end
|
#receive_line(line) ⇒ Object
25
26
27
28
29
30
31
32
33
|
# File 'lib/wukong/local/tcp_driver.rb', line 25
def receive_line line
@buffer = []
operation = proc { driver.send_through_dataflow(line) }
callback = proc { flush_buffer @buffer }
EM.defer(operation, callback)
rescue => e
EM.stop
raise Wukong::Error.new(e)
end
|
#unbind ⇒ Object
40
41
42
43
|
# File 'lib/wukong/local/tcp_driver.rb', line 40
def unbind
finalize_and_stop_dataflow
EM.stop
end
|