Class: PipeRpc::Hub::Socket
Instance Method Summary collapse
- #close(reason) ⇒ Object
-
#initialize(hub, args = {}) ⇒ Socket
constructor
A new instance of Socket.
- #off_received(callback) ⇒ Object
- #off_sent(callback) ⇒ Object
- #on_received(&on_received) ⇒ Object
- #on_sent(&on_sent) ⇒ Object
- #read ⇒ Object
- #unpack(packed) ⇒ Object
- #write(obj) ⇒ Object
Constructor Details
#initialize(hub, args = {}) ⇒ Socket
3 4 5 6 7 8 9 10 |
# File 'lib/pipe_rpc/hub_socket.rb', line 3 def initialize(hub, args = {}) @hub = hub @input = args.fetch(:input) @output = args.fetch(:output) @on_sent = [] @on_received = [] @closed_reason = "lost connection to other side" end |
Instance Method Details
#close(reason) ⇒ Object
54 55 56 57 58 |
# File 'lib/pipe_rpc/hub_socket.rb', line 54 def close(reason) @closed_reason = reason @input.close @output.close end |
#off_received(callback) ⇒ Object
50 51 52 |
# File 'lib/pipe_rpc/hub_socket.rb', line 50 def off_received(callback) @on_received.delete(callback) end |
#off_sent(callback) ⇒ Object
41 42 43 |
# File 'lib/pipe_rpc/hub_socket.rb', line 41 def off_sent(callback) @on_sent.delete(callback) end |
#on_received(&on_received) ⇒ Object
45 46 47 48 |
# File 'lib/pipe_rpc/hub_socket.rb', line 45 def on_received(&on_received) @on_received << on_received on_received end |
#on_sent(&on_sent) ⇒ Object
36 37 38 39 |
# File 'lib/pipe_rpc/hub_socket.rb', line 36 def on_sent(&on_sent) @on_sent << on_sent on_sent end |
#read ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/pipe_rpc/hub_socket.rb', line 12 def read raise ClosedError.new(@closed_reason) if @input.closed? # infinite #readpartial that also works with mruby-restricted_io packed = '' loop do packed << @input.sysread(2*16) break unless @input.class.select([@input], nil, nil, 0) end unpack(packed).tap do |unpacked| @on_received.each{ |cb| cb.call(unpacked) } end rescue MessagePack::Error => e write ErrorResponse.new(id: nil, error: { code: -32700, data: { message: "MessagePack: #{e.message}" } }) end |
#unpack(packed) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/pipe_rpc/hub_socket.rb', line 60 def unpack(packed) if ::Object.const_defined?(:MRUBY_VERSION) MessagePack.unpack(packed, true) # to create unknown symbols in mruby else MessagePack.unpack(packed) end end |
#write(obj) ⇒ Object
29 30 31 32 33 34 |
# File 'lib/pipe_rpc/hub_socket.rb', line 29 def write(obj) raise ClosedError.new(@closed_reason) if @output.closed? payload = obj.to_h @on_sent.each{ |callback| callback.call(payload) } @output.syswrite payload.to_msgpack end |