Class: PipeRpc::Hub::Socket

Inherits:
Object show all
Defined in:
lib/pipe_rpc/hub_socket.rb

Instance Method Summary collapse

Constructor Details

#initialize(hub, args = {}) ⇒ Socket



3
4
5
6
7
8
9
10
# File 'lib/pipe_rpc/hub_socket.rb', line 3

def initialize(hub, args = {})
  @hub = hub
  @input = args.fetch(:input)
  @output = args.fetch(:output)
  @on_sent = []
  @on_received = []
  @closed_reason = "lost connection to other side"
end

Instance Method Details

#close(reason) ⇒ Object



54
55
56
57
58
# File 'lib/pipe_rpc/hub_socket.rb', line 54

def close(reason)
  @closed_reason = reason
  @input.close
  @output.close
end

#off_received(callback) ⇒ Object



50
51
52
# File 'lib/pipe_rpc/hub_socket.rb', line 50

def off_received(callback)
  @on_received.delete(callback)
end

#off_sent(callback) ⇒ Object



41
42
43
# File 'lib/pipe_rpc/hub_socket.rb', line 41

def off_sent(callback)
  @on_sent.delete(callback)
end

#on_received(&on_received) ⇒ Object



45
46
47
48
# File 'lib/pipe_rpc/hub_socket.rb', line 45

def on_received(&on_received)
  @on_received << on_received
  on_received
end

#on_sent(&on_sent) ⇒ Object



36
37
38
39
# File 'lib/pipe_rpc/hub_socket.rb', line 36

def on_sent(&on_sent)
  @on_sent << on_sent
  on_sent
end

#readObject



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/pipe_rpc/hub_socket.rb', line 12

def read
  raise ClosedError.new(@closed_reason) if @input.closed?

  # infinite #readpartial that also works with mruby-restricted_io
  packed = ''
  loop do
    packed << @input.sysread(2*16)
    break unless @input.class.select([@input], nil, nil, 0)
  end

  unpack(packed).tap do |unpacked|
    @on_received.each{ |cb| cb.call(unpacked) }
  end
rescue MessagePack::Error => e
  write ErrorResponse.new(id: nil, error: { code: -32700, data: { message: "MessagePack: #{e.message}" } })
end

#unpack(packed) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/pipe_rpc/hub_socket.rb', line 60

def unpack(packed)
  if ::Object.const_defined?(:MRUBY_VERSION)
    MessagePack.unpack(packed, true) # to create unknown symbols in mruby
  else
    MessagePack.unpack(packed)
  end
end

#write(obj) ⇒ Object

Raises:



29
30
31
32
33
34
# File 'lib/pipe_rpc/hub_socket.rb', line 29

def write(obj)
  raise ClosedError.new(@closed_reason) if @output.closed?
  payload = obj.to_h
  @on_sent.each{ |callback| callback.call(payload) }
  @output.syswrite payload.to_msgpack
end