Class: Wamp::Connection::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/wamp/connection/stream.rb

Overview

creates actual socket and provides a way to read and write to it

Instance Method Summary collapse

Constructor Details

#initialize(socket_object) ⇒ Stream

Returns a new instance of Stream.



9
10
11
12
13
14
15
16
17
# File 'lib/wamp/connection/stream.rb', line 9

def initialize(socket_object)
  @socket_object = socket_object
  uri = URI.parse(socket_object.url)

  @selector = NIO::Selector.new
  @socket = TCPSocket.new(uri.host, uri.port)
  monitor = @selector.register(@socket, :r)
  monitor.value = proc { read_nonblock(monitor.io) }
end

Instance Method Details

#closeObject



61
62
63
64
65
# File 'lib/wamp/connection/stream.rb', line 61

def close
  shutdown
  @socket_object.connection_gone
  @closed = true
end

#read_nonblock(io) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/wamp/connection/stream.rb', line 43

def read_nonblock(io)
  incoming = io.read_nonblock(4096, exception: false)

  case incoming
  when :wait_readable
    nil
  when nil
    close
  else
    receive incoming
  end
end

#receive(data) ⇒ Object



32
33
34
# File 'lib/wamp/connection/stream.rb', line 32

def receive(data)
  @socket_object.parse(data)
end

#runObject



36
37
38
39
40
41
# File 'lib/wamp/connection/stream.rb', line 36

def run
  loop do
    @selector.select { |monitor| monitor.value.call if monitor.readable? }
    break if @closed
  end
end

#shutdownObject



56
57
58
59
# File 'lib/wamp/connection/stream.rb', line 56

def shutdown
  @selector.deregister(@socket)
  @socket.close
end

#write(data) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/wamp/connection/stream.rb', line 19

def write(data)
  written = @socket.write_nonblock(data, exception: false)

  case written
  when :wait_writable
    # procceed
  when data.bytesize
    data.bytesize
  else
    puts [:incomplete_write]
  end
end