Class: IO::Endpoint::Wrapper

Inherits:
Object
  • Object
show all
Includes:
Socket::Constants
Defined in:
lib/io/endpoint/wrapper.rb

Constant Summary collapse

ServerSocket =
::Socket
DEFAULT =
new

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.defaultObject



223
224
225
# File 'lib/io/endpoint/wrapper.rb', line 223

def self.default
	DEFAULT
end

Instance Method Details

#accept(server, timeout: nil, linger: nil, **options, &block) ⇒ Object

Bind to a local address and accept connections in a loop.



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/io/endpoint/wrapper.rb', line 188

def accept(server, timeout: nil, linger: nil, **options, &block)
	# Ensure we use a `loop do ... end` so that state is not leaked between iterations:
	
	loop do
		socket, address = socket_accept(server)
		
		if linger
			socket.setsockopt(SOL_SOCKET, SO_LINGER, 1)
		end
		
		if timeout
			set_timeout(socket, timeout)
		end
		
		schedule do
			# Some sockets, notably SSL sockets, need application level negotiation before they are ready:
			if socket.respond_to?(:start)
				begin
					socket.start
				rescue
					socket.close
					raise
				end
			end
			
			# It seems like OpenSSL doesn't return the address of the peer when using `accept`, so we need to get it from the socket:
			address ||= socket.remote_address
			
			yield socket, address
		end
	end
end

#async(&block) ⇒ Object

Legacy method for compatibility with older code.



27
28
29
# File 'lib/io/endpoint/wrapper.rb', line 27

def async(&block)
	schedule(&block)
end

#bind(local_address, protocol: 0, reuse_address: true, reuse_port: nil, linger: nil, bound_timeout: nil, backlog: Socket::SOMAXCONN, **options, &block) ⇒ Object

Bind to a local address.

Examples:

socket = Async::IO::Socket.bind(Async::IO::Address.tcp("0.0.0.0", 9090))


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/io/endpoint/wrapper.rb', line 129

def bind(local_address, protocol: 0, reuse_address: true, reuse_port: nil, linger: nil, bound_timeout: nil, backlog: Socket::SOMAXCONN, **options, &block)
	socket = nil
	
	begin
		socket = ServerSocket.new(local_address.afamily, local_address.socktype, protocol)
		
		if reuse_address
			socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
		end
		
		if reuse_port
			socket.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
		end
		
		if linger
			socket.setsockopt(SOL_SOCKET, SO_LINGER, 1)
		end
		
		# Set the timeout:
		if bound_timeout
			set_timeout(socket, bound_timeout)
		end
		
		socket.bind(local_address.to_sockaddr)
		
		if backlog
			begin
				# Generally speaking, bind/listen is a common pattern, but it's not applicable to all socket types. We ignore the error if it's not supported as the alternative is exposing this upstream, which seems less desirable than handling it here. In other words, `bind` in this context means "prepare it to accept connections", whatever that means for the given socket type.
				socket.listen(backlog)
			rescue Errno::EOPNOTSUPP
				# Ignore.
			end
		end
	rescue
		socket&.close
		raise
	end
	
	return socket unless block_given?
	
	schedule do
		begin
			yield socket
		ensure
			socket.close
		end
	end
end

#connect(remote_address, local_address: nil, linger: nil, timeout: nil, buffered: false, **options) ⇒ Object

Establish a connection to a given ‘remote_address`.

Examples:

socket = Async::IO::Socket.connect(Async::IO::Address.tcp("8.8.8.8", 53))


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/io/endpoint/wrapper.rb', line 67

def connect(remote_address, local_address: nil, linger: nil, timeout: nil, buffered: false, **options)
	socket = nil
	
	begin
		socket = ::Socket.new(remote_address.afamily, remote_address.socktype, remote_address.protocol)
		
		if linger
			socket.setsockopt(SOL_SOCKET, SO_LINGER, 1)
		end
		
		if buffered == false
			set_buffered(socket, buffered)
		end
		
		if timeout
			set_timeout(socket, timeout)
		end
		
		if local_address
			if defined?(IP_BIND_ADDRESS_NO_PORT)
				# Inform the kernel (Linux 4.2+) to not reserve an ephemeral port when using bind(2) with a port number of 0. The port will later be automatically chosen at connect(2) time, in a way that allows sharing a source port as long as the 4-tuple is unique.
				socket.setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1)
			end
			
			socket.bind(local_address.to_sockaddr)
		end
	rescue
		socket&.close
		raise
	end
	
	begin
		socket_connect(socket, remote_address)
	rescue Exception
		socket.close
		raise
	end
	
	return socket unless block_given?
	
	begin
		yield socket
	ensure
		socket.close
	end
end

#schedule(&block) ⇒ Object



13
14
15
16
17
18
19
# File 'lib/io/endpoint/wrapper.rb', line 13

def schedule(&block)
	if Fiber.scheduler
		Fiber.schedule(&block)
	else
		Thread.new(&block)
	end
end

#set_buffered(socket, buffered) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/io/endpoint/wrapper.rb', line 37

def set_buffered(socket, buffered)
	case buffered
	when true
		socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 0)
	when false
		socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
	end
rescue Errno::EINVAL
	# On Darwin, sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is enabled despite this result.
rescue Errno::EOPNOTSUPP
	# Some platforms may simply not support the operation.
rescue Errno::ENOPROTOOPT
	# It may not be supported by the protocol (e.g. UDP). ¯\_(ツ)_/¯
end

#set_timeout(io, timeout) ⇒ Object



31
32
33
34
35
# File 'lib/io/endpoint/wrapper.rb', line 31

def set_timeout(io, timeout)
	if io.respond_to?(:timeout=)
		io.timeout = timeout
	end
end

#socket_accept(server) ⇒ Object

Accept a connection from a bound socket. This is an extension point for subclasses to provide additional functionality.



183
184
185
# File 'lib/io/endpoint/wrapper.rb', line 183

def socket_accept(server)
	server.accept
end

#socket_connect(socket, remote_address) ⇒ Object

Connect a socket to a remote address. This is an extension point for subclasses to provide additional functionality.



57
58
59
# File 'lib/io/endpoint/wrapper.rb', line 57

def socket_connect(socket, remote_address)
	socket.connect(remote_address.to_sockaddr)
end