Class: Async::IO::SharedEndpoint

Inherits:
Endpoint
  • Object
show all
Defined in:
lib/async/io/shared_endpoint.rb

Overview

Pre-connect and pre-bind sockets so that it can be used between processes.

Instance Attribute Summary collapse

Attributes inherited from Endpoint

#options

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Endpoint

#bound, each, #each, #hostname, #linger, #local_address, parse, #reuse_address, #reuse_port, socket, ssl, tcp, #timeout, try_convert, udp, unix, #with

Constructor Details

#initialize(endpoint, wrappers, **options) ⇒ SharedEndpoint

Returns a new instance of SharedEndpoint.



50
51
52
53
54
55
# File 'lib/async/io/shared_endpoint.rb', line 50

def initialize(endpoint, wrappers, **options)
  super(**options)
  
  @endpoint = endpoint
  @wrappers = wrappers
end

Instance Attribute Details

#endpointObject (readonly)

Returns the value of attribute endpoint.



57
58
59
# File 'lib/async/io/shared_endpoint.rb', line 57

def endpoint
  @endpoint
end

#wrappersObject (readonly)

Returns the value of attribute wrappers.



58
59
60
# File 'lib/async/io/shared_endpoint.rb', line 58

def wrappers
  @wrappers
end

Class Method Details

.bound(endpoint, backlog = Socket::SOMAXCONN) ⇒ Object

Create a new SharedEndpoint by binding to the given endpoint.



30
31
32
33
34
35
36
37
38
# File 'lib/async/io/shared_endpoint.rb', line 30

def self.bound(endpoint, backlog = Socket::SOMAXCONN)
  wrappers = endpoint.bound do |server|
    server.listen(backlog)
    server.close_on_exec = false
    server.reactor = nil
  end
  
  return self.new(endpoint, wrappers)
end

.connected(endpoint) ⇒ Object

Create a new SharedEndpoint by connecting to the given endpoint.



41
42
43
44
45
46
47
48
# File 'lib/async/io/shared_endpoint.rb', line 41

def self.connected(endpoint)
  wrapper = endpoint.connect
  
  wrapper.close_on_exec = false
  wrapper.reactor = nil
  
  return self.new(endpoint, [wrapper])
end

Instance Method Details

#accept(backlog = nil, &block) ⇒ Object



102
103
104
105
106
# File 'lib/async/io/shared_endpoint.rb', line 102

def accept(backlog = nil, &block)
  bind do |server|
    server.accept_each(&block)
  end
end

#bindObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/async/io/shared_endpoint.rb', line 66

def bind
  task = Async::Task.current
  
  @wrappers.each do |server|
    server = server.dup
    
    task.async do |task|
      task.annotate "binding to #{server.inspect}"
      
      begin
        yield server, task
      ensure
        server.close
      end
    end
  end
end

#closeObject

Close all the internal wrappers.



61
62
63
64
# File 'lib/async/io/shared_endpoint.rb', line 61

def close
  @wrappers.each(&:close)
  @wrappers.clear
end

#connectObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/async/io/shared_endpoint.rb', line 84

def connect
  task = Async::Task.current
  
  @wrappers.each do |peer|
    peer = peer.dup
    
    task.async do |task|
      task.annotate "connected to #{peer.inspect} [#{peer.fileno}]"
      
      begin
        yield peer, task
      ensure
        peer.close
      end
    end
  end
end

#to_sObject



108
109
110
# File 'lib/async/io/shared_endpoint.rb', line 108

def to_s
  "\#<#{self.class} #{@wrappers.size} descriptors for #{@endpoint}>"
end