Class: IO::Endpoint::Wrapper

Inherits:
Object
  • Object
show all
Includes:
Socket::Constants
Defined in:
lib/io/endpoint/wrapper.rb

Constant Summary collapse

ServerSocket =
::Socket
DEFAULT =
new

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.defaultObject



223
224
225
# File 'lib/io/endpoint/wrapper.rb', line 223

def self.default
  DEFAULT
end

Instance Method Details

#accept(server, timeout: nil, linger: nil, **options, &block) ⇒ Object

Bind to a local address and accept connections in a loop.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/io/endpoint/wrapper.rb', line 188

def accept(server, timeout: nil, linger: nil, **options, &block)
  # Ensure we use a `loop do ... end` so that state is not leaked between iterations:
  
  loop do
    socket, address = socket_accept(server)
    
    if linger
      socket.setsockopt(SOL_SOCKET, SO_LINGER, 1)
    end
    
    if timeout
      set_timeout(socket, timeout)
    end
    
    schedule do
      # Some sockets, notably SSL sockets, need application level negotiation before they are ready:
      if socket.respond_to?(:start)
        begin
          socket.start
        rescue
          socket.close
          raise
        end
      end
      
      # It seems like OpenSSL doesn't return the address of the peer when using `accept`, so we need to get it from the socket:
      address ||= socket.remote_address
      
      yield socket, address
    end
  end
end

#async(&block) ⇒ Object

Legacy method for compatibility with older code.



27
28
29
# File 'lib/io/endpoint/wrapper.rb', line 27

def async(&block)
  schedule(&block)
end

#bind(local_address, protocol: 0, reuse_address: true, reuse_port: nil, linger: nil, bound_timeout: nil, backlog: Socket::SOMAXCONN, **options, &block) ⇒ Object

Bind to a local address.

Examples:

socket = Async::IO::Socket.bind(Async::IO::Address.tcp("0.0.0.0", 9090))


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/io/endpoint/wrapper.rb', line 129

def bind(local_address, protocol: 0, reuse_address: true, reuse_port: nil, linger: nil, bound_timeout: nil, backlog: Socket::SOMAXCONN, **options, &block)
  socket = nil
  
  begin
    socket = ServerSocket.new(local_address.afamily, local_address.socktype, protocol)
    
    if reuse_address
      socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    end
    
    if reuse_port
      socket.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
    end
    
    if linger
      socket.setsockopt(SOL_SOCKET, SO_LINGER, 1)
    end
    
    # Set the timeout:
    if bound_timeout
      set_timeout(socket, bound_timeout)
    end
    
    socket.bind(local_address.to_sockaddr)
    
    if backlog
      begin
        # Generally speaking, bind/listen is a common pattern, but it's not applicable to all socket types. We ignore the error if it's not supported as the alternative is exposing this upstream, which seems less desirable than handling it here. In other words, `bind` in this context means "prepare it to accept connections", whatever that means for the given socket type.
        socket.listen(backlog)
      rescue Errno::EOPNOTSUPP
        # Ignore.
      end
    end
  rescue
    socket&.close
    raise
  end
  
  return socket unless block_given?
  
  schedule do
    begin
      yield socket
    ensure
      socket.close
    end
  end
end

#connect(remote_address, local_address: nil, linger: nil, timeout: nil, buffered: false, **options) ⇒ Object

Establish a connection to a given ‘remote_address`.

Examples:

socket = Async::IO::Socket.connect(Async::IO::Address.tcp("8.8.8.8", 53))


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/io/endpoint/wrapper.rb', line 67

def connect(remote_address, local_address: nil, linger: nil, timeout: nil, buffered: false, **options)
  socket = nil
  
  begin
    socket = ::Socket.new(remote_address.afamily, remote_address.socktype, remote_address.protocol)
    
    if linger
      socket.setsockopt(SOL_SOCKET, SO_LINGER, 1)
    end
    
    if buffered == false
      set_buffered(socket, buffered)
    end
    
    if timeout
      set_timeout(socket, timeout)
    end
    
    if local_address
      if defined?(IP_BIND_ADDRESS_NO_PORT)
        # Inform the kernel (Linux 4.2+) to not reserve an ephemeral port when using bind(2) with a port number of 0. The port will later be automatically chosen at connect(2) time, in a way that allows sharing a source port as long as the 4-tuple is unique.
        socket.setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1)
      end
      
      socket.bind(local_address.to_sockaddr)
    end
  rescue
    socket&.close
    raise
  end
  
  begin
    socket_connect(socket, remote_address)
  rescue Exception
    socket.close
    raise
  end
  
  return socket unless block_given?
  
  begin
    yield socket
  ensure
    socket.close
  end
end

#schedule(&block) ⇒ Object



13
14
15
16
17
18
19
# File 'lib/io/endpoint/wrapper.rb', line 13

def schedule(&block)
  if Fiber.scheduler
    Fiber.schedule(&block)
  else
    Thread.new(&block)
  end
end

#set_buffered(socket, buffered) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/io/endpoint/wrapper.rb', line 37

def set_buffered(socket, buffered)
  case buffered
  when true
    socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 0)
  when false
    socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
  end
rescue Errno::EINVAL
  # On Darwin, sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is enabled despite this result.
rescue Errno::EOPNOTSUPP
  # Some platforms may simply not support the operation.
rescue Errno::ENOPROTOOPT
  # It may not be supported by the protocol (e.g. UDP). ¯\_(ツ)_/¯
end

#set_timeout(io, timeout) ⇒ Object



31
32
33
34
35
# File 'lib/io/endpoint/wrapper.rb', line 31

def set_timeout(io, timeout)
  if io.respond_to?(:timeout=)
    io.timeout = timeout
  end
end

#socket_accept(server) ⇒ Object

Accept a connection from a bound socket. This is an extension point for subclasses to provide additional functionality.



183
184
185
# File 'lib/io/endpoint/wrapper.rb', line 183

def socket_accept(server)
  server.accept
end

#socket_connect(socket, remote_address) ⇒ Object

Connect a socket to a remote address. This is an extension point for subclasses to provide additional functionality.



57
58
59
# File 'lib/io/endpoint/wrapper.rb', line 57

def socket_connect(socket, remote_address)
  socket.connect(remote_address.to_sockaddr)
end