Class: Distributor::Server
- Inherits:
-
Object
- Object
- Distributor::Server
- Defined in:
- lib/distributor/server.rb
Instance Method Summary collapse
- #command(command, data = {}) ⇒ Object
-
#initialize(input, output = input) ⇒ Server
constructor
A new instance of Server.
- #on_command(&blk) ⇒ Object
- #run(command) ⇒ Object
- #start ⇒ Object
- #tunnel(port) ⇒ Object
Constructor Details
#initialize(input, output = input) ⇒ Server
Returns a new instance of Server.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/distributor/server.rb', line 10 def initialize(input, output=input) @connector = Distributor::Connector.new @multiplexer = Distributor::Multiplexer.new(output) @on_command = Proc.new {} # reserve a command channel @multiplexer.reserve(0) # feed data from the input channel into the multiplexer @connector.handle(input) do |io| @multiplexer.input io end @connector.on_close(input) do |io| exit 0 end # handle the command channel of the multiplexer @connector.handle(@multiplexer.reader(0)) do |io| append_json(io.readpartial(4096)) dequeue_json do |data| case command = data["command"] when "tunnel" then port = (data["port"] || ENV["PORT"] || 5000).to_i ch = tunnel(port) @multiplexer.output 0, Distributor::OkJson.encode({ "id" => data["id"], "command" => "ack", "ch" => ch, "port" => port }) when "close" then @multiplexer.close data["ch"] when "run" then ch = run(data["args"]) @multiplexer.output 0, Distributor::OkJson.encode({ "id" => data["id"], "command" => "ack", "ch" => ch }) else @on_command.call command, data end end end end |
Instance Method Details
#command(command, data = {}) ⇒ Object
97 98 99 100 101 102 |
# File 'lib/distributor/server.rb', line 97 def command(command, data={}) data["id"] ||= @multiplexer.generate_id data["command"] = command @multiplexer.output 0, Distributor::OkJson.encode(data) data["id"] end |
#on_command(&blk) ⇒ Object
104 105 106 |
# File 'lib/distributor/server.rb', line 104 def on_command(&blk) @on_command = blk end |
#run(command) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/distributor/server.rb', line 49 def run(command) ch = @multiplexer.reserve rd, wr, pid = PTY.spawn(command) # handle data incoming from process @connector.handle(rd) do |io| begin @multiplexer.output(ch, io.readpartial(4096)) rescue EOFError @multiplexer.close(ch) @connector.close(io) end end # handle data incoming on the multiplexer @connector.handle(@multiplexer.reader(ch)) do |input_io| data = input_io.readpartial(4096) wr.write data end ch end |
#start ⇒ Object
108 109 110 111 |
# File 'lib/distributor/server.rb', line 108 def start @multiplexer.output 0, Distributor::OkJson.encode({ "command" => "hello" }) loop { @connector.listen } end |
#tunnel(port) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/distributor/server.rb', line 73 def tunnel(port) ch = @multiplexer.reserve tcp = TCPSocket.new("localhost", port) # handle data incoming from process @connector.handle(tcp) do |io| begin @multiplexer.output(ch, io.readpartial(4096)) rescue EOFError @multiplexer.close(ch) @connector.close(io) end end # handle data incoming on the multiplexer @connector.handle(@multiplexer.reader(ch)) do |input_io| data = input_io.readpartial(4096) tcp.write data end ch end |