Class: Async::IO::SharedEndpoint

Inherits:
Endpoint
  • Object
show all
Defined in:
lib/async/io/shared_endpoint.rb

Overview

Pre-connect and pre-bind sockets so that it can be used between processes.

Instance Attribute Summary collapse

Attributes inherited from Endpoint

#options

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Endpoint

#bound, composite, #each, each, #hostname, #linger, #local_address, parse, #reuse_address, #reuse_port, socket, ssl, tcp, #timeout, try_convert, udp, unix, #with

Constructor Details

#initialize(endpoint, wrappers, **options) ⇒ SharedEndpoint

Returns a new instance of SharedEndpoint.



41
42
43
44
45
46
# File 'lib/async/io/shared_endpoint.rb', line 41

def initialize(endpoint, wrappers, **options)
	super(**options)
	
	@endpoint = endpoint
	@wrappers = wrappers
end

Instance Attribute Details

#endpointObject (readonly)

Returns the value of attribute endpoint.



48
49
50
# File 'lib/async/io/shared_endpoint.rb', line 48

def endpoint
  @endpoint
end

#wrappersObject (readonly)

Returns the value of attribute wrappers.



49
50
51
# File 'lib/async/io/shared_endpoint.rb', line 49

def wrappers
  @wrappers
end

Class Method Details

.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false) ⇒ Object

Create a new ‘SharedEndpoint` by binding to the given endpoint.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/async/io/shared_endpoint.rb', line 15

def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false)
	wrappers = endpoint.bound do |server|
		# This is somewhat optional. We want to have a generic interface as much as possible so that users of this interface can just call it without knowing a lot of internal details. Therefore, we ignore errors here if it's because the underlying socket does not support the operation.
		begin
			server.listen(backlog)
		rescue Errno::EOPNOTSUPP
			# Ignore.
		end
		
		server.close_on_exec = close_on_exec
		server.reactor = nil
	end
	
	return self.new(endpoint, wrappers)
end

.connected(endpoint, close_on_exec: false) ⇒ Object

Create a new ‘SharedEndpoint` by connecting to the given endpoint.



32
33
34
35
36
37
38
39
# File 'lib/async/io/shared_endpoint.rb', line 32

def self.connected(endpoint, close_on_exec: false)
	wrapper = endpoint.connect
	
	wrapper.close_on_exec = close_on_exec
	wrapper.reactor = nil
	
	return self.new(endpoint, [wrapper])
end

Instance Method Details

#accept(backlog = nil, &block) ⇒ Object



110
111
112
113
114
# File 'lib/async/io/shared_endpoint.rb', line 110

def accept(backlog = nil, &block)
	bind do |server|
		server.accept_each(&block)
	end
end

#bindObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/async/io/shared_endpoint.rb', line 74

def bind
	task = Async::Task.current
	
	@wrappers.each do |server|
		server = server.dup
		
		task.async do |task|
			task.annotate "binding to #{server.inspect}"
			
			begin
				yield server, task
			ensure
				server.close
			end
		end
	end
end

#closeObject

Close all the internal wrappers.



69
70
71
72
# File 'lib/async/io/shared_endpoint.rb', line 69

def close
	@wrappers.each(&:close)
	@wrappers.clear
end

#connectObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/async/io/shared_endpoint.rb', line 92

def connect
	task = Async::Task.current
	
	@wrappers.each do |peer|
		peer = peer.dup
		
		task.async do |task|
			task.annotate "connected to #{peer.inspect} [#{peer.fileno}]"
			
			begin
				yield peer, task
			ensure
				peer.close
			end
		end
	end
end

#local_address_endpoint(**options) ⇒ Object



51
52
53
54
55
56
57
58
# File 'lib/async/io/shared_endpoint.rb', line 51

def local_address_endpoint(**options)
	endpoints = @wrappers.map do |wrapper|
		# Forward the options to the internal endpoints:
		AddressEndpoint.new(wrapper.to_io.local_address, **options)
	end
	
	return CompositeEndpoint.new(endpoints)
end

#remote_address_endpoint(**options) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/async/io/shared_endpoint.rb', line 60

def remote_address_endpoint(**options)
	endpoints = @wrappers.map do |wrapper|
		AddressEndpoint.new(wrapper.to_io.remote_address)
	end
	
	return CompositeEndpoint.new(endpoints, **options)
end

#to_sObject



116
117
118
# File 'lib/async/io/shared_endpoint.rb', line 116

def to_s
	"\#<#{self.class} #{@wrappers.size} descriptors for #{@endpoint}>"
end