Class: Socketry::TCP::Socket

Inherits:
Object
  • Object
show all
Includes:
Socketry::Timeout
Defined in:
lib/socketry/tcp/socket.rb

Overview

Transmission Control Protocol sockets: Provide stream-like semantics

Direct Known Subclasses

SSL::Socket

Constant Summary

Constants included from Socketry::Timeout

Socketry::Timeout::DEFAULT_TIMEOUTS, Socketry::Timeout::DEFAULT_TIMER

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Socketry::Timeout

#clear_timeout, #lifetime, #set_timeout, #start_timer, #time_remaining

Constructor Details

#initialize(read_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:read], write_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:write], timer: Socketry::Timeout::DEFAULT_TIMER.new, resolver: Socketry::Resolver::DEFAULT_RESOLVER, socket_class: ::Socket) ⇒ Socketry::TCP::Socket

Create an unconnected Socketry::TCP::Socket

Parameters:

  • read_timeout (Numeric) (defaults to: Socketry::Timeout::DEFAULT_TIMEOUTS[:read])

    Seconds to wait before an uncompleted read errors

  • write_timeout (Numeric) (defaults to: Socketry::Timeout::DEFAULT_TIMEOUTS[:write])

    Seconds to wait before an uncompleted write errors

  • timer (Object) (defaults to: Socketry::Timeout::DEFAULT_TIMER.new)

    A timekeeping object to use for measuring timeouts

  • resolver (Object) (defaults to: Socketry::Resolver::DEFAULT_RESOLVER)

    A resolver object to use for resolving DNS names

  • socket_class (Object) (defaults to: ::Socket)

    Underlying socket class which implements I/O ops



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/socketry/tcp/socket.rb', line 31

def initialize(
  read_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:read],
  write_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:write],
  timer: Socketry::Timeout::DEFAULT_TIMER.new,
  resolver: Socketry::Resolver::DEFAULT_RESOLVER,
  socket_class: ::Socket
)
  @read_timeout = read_timeout
  @write_timeout = write_timeout

  @socket_class = socket_class
  @resolver = resolver

  @family = nil
  @socket = nil

  @remote_addr = nil
  @remote_port = nil
  @local_addr  = nil
  @local_port  = nil

  start_timer(timer)
end

Instance Attribute Details

#local_addrObject (readonly)

Returns the value of attribute local_addr.



10
11
12
# File 'lib/socketry/tcp/socket.rb', line 10

def local_addr
  @local_addr
end

#local_portObject (readonly)

Returns the value of attribute local_port.



10
11
12
# File 'lib/socketry/tcp/socket.rb', line 10

def local_port
  @local_port
end

#read_timeoutObject (readonly)

Returns the value of attribute read_timeout.



11
12
13
# File 'lib/socketry/tcp/socket.rb', line 11

def read_timeout
  @read_timeout
end

#remote_addrObject (readonly)

Returns the value of attribute remote_addr.



10
11
12
# File 'lib/socketry/tcp/socket.rb', line 10

def remote_addr
  @remote_addr
end

#remote_portObject (readonly)

Returns the value of attribute remote_port.



10
11
12
# File 'lib/socketry/tcp/socket.rb', line 10

def remote_port
  @remote_port
end

#resolverObject (readonly)

Returns the value of attribute resolver.



11
12
13
# File 'lib/socketry/tcp/socket.rb', line 11

def resolver
  @resolver
end

#socket_classObject (readonly)

Returns the value of attribute socket_class.



11
12
13
# File 'lib/socketry/tcp/socket.rb', line 11

def socket_class
  @socket_class
end

#write_timeoutObject (readonly)

Returns the value of attribute write_timeout.



11
12
13
# File 'lib/socketry/tcp/socket.rb', line 11

def write_timeout
  @write_timeout
end

Class Method Details

.connect(remote_addr, remote_port, **args) ⇒ Socketry::TCP::Socket

Create a Socketry::TCP::Socket with the default options, then connect to the given host.

Parameters:

  • remote_addr (String)

    DNS name or IP address of the host to connect to

  • remote_port (Fixnum)

    TCP port to connect to

Returns:



19
20
21
# File 'lib/socketry/tcp/socket.rb', line 19

def self.connect(remote_addr, remote_port, **args)
  new.connect(remote_addr, remote_port, **args)
end

Instance Method Details

#closetrue, false

Close the socket

Returns:

  • (true, false)

    true if the socket was open, false if closed



245
246
247
248
249
250
251
# File 'lib/socketry/tcp/socket.rb', line 245

def close
  return false unless connected?
  @socket.close
  true
ensure
  @socket = nil
end

#connect(remote_addr, remote_port, local_addr: nil, local_port: nil, timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect]) ⇒ self

Connect to a remote host

Parameters:

  • remote_addr (String)

    DNS name or IP address of the host to connect to

  • remote_port (Fixnum)

    TCP port to connect to

  • local_addr (String) (defaults to: nil)

    DNS name or IP address to bind to locally

  • local_port (Fixnum) (defaults to: nil)

    Local TCP port to bind to

  • timeout (Numeric) (defaults to: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect])

    Number of seconds to wait before aborting connect

  • socket_class (Class)

    Custom low-level socket class

Returns:

  • (self)

Raises:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/socketry/tcp/socket.rb', line 66

def connect(
  remote_addr,
  remote_port,
  local_addr: nil,
  local_port: nil,
  timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect]
)
  ensure_disconnected

  @remote_addr = remote_addr
  @remote_port = remote_port
  @local_addr  = local_addr
  @local_port  = local_port

  begin
    set_timeout(timeout)

    remote_addr = @resolver.resolve(remote_addr, timeout: time_remaining(timeout))
    local_addr  = @resolver.resolve(local_addr,  timeout: time_remaining(timeout)) if local_addr
    raise ArgumentError, "expected IPAddr from resolver, got #{remote_addr.class}" unless remote_addr.is_a?(IPAddr)

    if remote_addr.ipv4?
      @family = ::Socket::AF_INET
    elsif remote_addr.ipv6?
      @family = ::Socket::AF_INET6
    else raise Socketry::AddressError, "unsupported IP address family: #{remote_addr}"
    end

    socket = @socket_class.new(@family, ::Socket::SOCK_STREAM, 0)
    socket.bind Addrinfo.tcp(local_addr.to_s, local_port) if local_addr
    remote_sockaddr = ::Socket.sockaddr_in(remote_port, remote_addr.to_s)

    # Note: `exception: false` for Socket#connect_nonblock is only supported in Ruby 2.3+
    begin
      socket.connect_nonblock(remote_sockaddr)
    rescue Errno::EINPROGRESS, Errno::EALREADY
      # Earlier JRuby 9.x versions do not seem to correctly support Socket#wait_writable in this case
      # Newer versions seem to behave correctly
      retry if IO.select(nil, [socket], nil, time_remaining(timeout))

      socket.close
      raise Socketry::TimeoutError, "connection to #{remote_addr}:#{remote_port} timed out"
    rescue Errno::EISCONN
      # Sometimes raised when we've connected successfully
    end

    @socket = socket
  ensure
    clear_timeout(timeout)
  end

  self
end

#connected?true, false

Is the socket currently connected?

This method returns the local connection state. However, it’s possible the remote side has closed the connection, so it’s not actually possible to actually know if the socket is actually still open without reading from or writing to it. It’s sort of like the Heisenberg uncertainty principle of sockets.

Returns:

  • (true, false)

    do we locally think the socket is open?



262
263
264
# File 'lib/socketry/tcp/socket.rb', line 262

def connected?
  @socket != nil
end

#from_socket(socket) ⇒ Object

Wrap a Ruby/low-level socket in an Socketry::TCP::Socket

Parameters:

  • socket (::Socket)

    (or specified socket_class) low-level socket to wrap

Raises:

  • (TypeError)


133
134
135
136
137
138
# File 'lib/socketry/tcp/socket.rb', line 133

def from_socket(socket)
  ensure_disconnected
  raise TypeError, "expected #{@socket_class}, got #{socket.class}" unless socket.is_a?(@socket_class)
  @socket = socket
  self
end

#nodelaytrue, false

Check whether Nagle’s algorithm has been disabled

Returns:

  • (true)

    Nagle’s algorithm has been explicitly disabled

  • (false)

    Nagle’s algorithm is enabled (default)



221
222
223
224
# File 'lib/socketry/tcp/socket.rb', line 221

def nodelay
  ensure_connected
  @socket.getsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY).int.nonzero?
end

#nodelay=(flag) ⇒ Object

Disable or enable Nagle’s algorithm

Parameters:

  • flag (true, false)

    disable or enable coalescing multiple writesusing Nagle’s algorithm



229
230
231
232
# File 'lib/socketry/tcp/socket.rb', line 229

def nodelay=(flag)
  ensure_connected
  @socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, flag ? 1 : 0)
end

#read_nonblock(size, outbuf: nil) ⇒ String, :wait_readable

Perform a non-blocking read operation

Parameters:

  • size (Fixnum)

    number of bytes to attempt to read

  • outbuf (String, NilClass) (defaults to: nil)

    an optional buffer into which data should be read

Returns:

  • (String, :wait_readable)

    data read, or :wait_readable if operation would block

Raises:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/socketry/tcp/socket.rb', line 146

def read_nonblock(size, outbuf: nil)
  ensure_connected
  case outbuf
  when String
    @socket.read_nonblock(size, outbuf, exception: false)
  when NilClass
    @socket.read_nonblock(size, exception: false)
  else raise TypeError, "unexpected outbuf class: #{outbuf.class}"
  end
rescue IO::WaitReadable
  # Some buggy Rubies continue to raise this exception
  :wait_readable
rescue IOError => ex
  raise Socketry::Error, ex.message, ex.backtrace
end

#readpartial(size, outbuf: nil, timeout: @read_timeout) ⇒ String

Read a partial amounth of data, blocking until it becomes available

Parameters:

  • size (Fixnum)

    number of bytes to attempt to read

Returns:

  • (String)

Raises:



167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/socketry/tcp/socket.rb', line 167

def readpartial(size, outbuf: nil, timeout: @read_timeout)
  set_timeout(timeout)

  begin
    while (result = read_nonblock(size, outbuf: outbuf)) == :wait_readable
      next if @socket.wait_readable(read_timeout)
      raise TimeoutError, "read timed out after #{timeout} seconds"
    end
  ensure
    clear_timeout(timeout)
  end

  result || :eof
end

#reconnect(timeout: ) ⇒ Object

Re-establish a lost TCP connection

Parameters:

  • timeout (Numeric) (defaults to: )

    Number of seconds to wait before aborting re-connect

Raises:



124
125
126
127
128
# File 'lib/socketry/tcp/socket.rb', line 124

def reconnect(timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect])
  ensure_disconnected
  raise StateError, "can't reconnect: never completed initial connection" unless @remote_addr
  connect(@remote_addr, @remote_port, local_addr: @local_addr, local_port: @local_port, timeout: timeout)
end

#to_ioIO

Return a raw Ruby I/O object

Returns:

  • (IO)

    Ruby I/O object



237
238
239
240
# File 'lib/socketry/tcp/socket.rb', line 237

def to_io
  ensure_connected
  ::IO.try_convert(@socket)
end

#write_nonblock(data) ⇒ Fixnum, :wait_writable

Perform a non-blocking write operation

Parameters:

  • data (String)

    number of bytes to attempt to read

Returns:

  • (Fixnum, :wait_writable)

    number of bytes written, or :wait_writable if op would block

Raises:



187
188
189
190
191
192
193
194
195
# File 'lib/socketry/tcp/socket.rb', line 187

def write_nonblock(data)
  ensure_connected
  @socket.write_nonblock(data, exception: false)
rescue IO::WaitWriteable
  # Some buggy Rubies continue to raise this exception
  :wait_writable
rescue IOError => ex
  raise Socketry::Error, ex.message, ex.backtrace
end

#writepartial(data, timeout: @write_timeout) ⇒ Fixnum, :wait_writable

Write a partial amounth of data, blocking until it’s completed

Parameters:

  • data (String)

    number of bytes to attempt to read

Returns:

  • (Fixnum, :wait_writable)

    number of bytes written, or :wait_writable if op would block

Raises:



202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/socketry/tcp/socket.rb', line 202

def writepartial(data, timeout: @write_timeout)
  set_timeout(timeout)

  begin
    while (result = write_nonblock(data)) == :wait_writable
      next if @socket.wait_writable(read_timeout)
      raise TimeoutError, "write timed out after #{timeout} seconds"
    end
  ensure
    clear_timeout(timeout)
  end

  result || :eof
end