Class: Libuv::TCP

Inherits:
Handle show all
Includes:
Net, Stream
Defined in:
lib/libuv/tcp.rb

Defined Under Namespace

Classes: Socket4, Socket6, SocketBase

Constant Summary collapse

TLS_ERROR =
"TLS write failed"

Constants included from Stream

Stream::BACKLOG_ERROR, Stream::CLOSED_HANDLE_ERROR, Stream::STREAM_CLOSED_ERROR, Stream::WRITE_ERROR

Constants included from Net

Net::INET6_ADDRSTRLEN, Net::INET_ADDRSTRLEN, Net::IP_ARGUMENT_ERROR, Net::PORT_ARGUMENT_ERROR

Constants included from Assertions

Assertions::MSG_NO_PROC

Constants inherited from Q::Promise

Q::Promise::MAKE_PROMISE

Instance Attribute Summary collapse

Attributes inherited from Handle

#closed, #reactor, #storage

Attributes inherited from Q::Promise

#trace

Instance Method Summary collapse

Methods included from Stream

#close_write, #flush, included, #listen, #progress, #read, #readable?, #start_read, #stop_read, #try_write, #writable?

Methods inherited from Handle

#active?, #closed?, #closing?, #ref, #unref

Methods included from Assertions

#assert_block, #assert_boolean, #assert_type

Methods included from Resource

#check_result, #check_result!, #resolve, #to_ptr

Methods inherited from Q::DeferredPromise

#resolved?, #then

Methods inherited from Q::Promise

#catch, #finally, #progress, #ruby_catch, #value

Constructor Details

#initialize(reactor, acceptor = nil, progress: nil, flags: nil, **tls_options) ⇒ TCP

Returns a new instance of TCP.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/libuv/tcp.rb', line 26

def initialize(reactor, acceptor = nil, progress: nil, flags: nil, **tls_options)
    @reactor = reactor
    @progress = progress
    @tls_options = tls_options

    tcp_ptr = ::Libuv::Ext.allocate_handle_tcp
    error = if flags
        check_result(::Libuv::Ext.tcp_init_ex(reactor.handle, tcp_ptr, flags))
    else
        check_result(::Libuv::Ext.tcp_init(reactor.handle, tcp_ptr))
    end

    if acceptor && error.nil?
        error = check_result(::Libuv::Ext.accept(acceptor, tcp_ptr))
        @connected = true
    else
        @connected = false
    end
    
    super(tcp_ptr, error)
end

Instance Attribute Details

#connectedObject (readonly)

Returns the value of attribute connected.



18
19
20
# File 'lib/libuv/tcp.rb', line 18

def connected
  @connected
end

#protocolObject (readonly)

Returns the value of attribute protocol.



19
20
21
# File 'lib/libuv/tcp.rb', line 19

def protocol
  @protocol
end

#tlsObject (readonly)

Returns the value of attribute tls.



20
21
22
# File 'lib/libuv/tcp.rb', line 20

def tls
  @tls
end

Instance Method Details

#add_host(**host_opts) ⇒ Object



209
210
211
212
# File 'lib/libuv/tcp.rb', line 209

def add_host(**host_opts)
    @tls_options[:hosts] ||= []
    @tls_options[:hosts] << host_opts
end

#bind(ip, port, callback = nil, **tls_options, &blk) ⇒ Object

END TLS Abstraction ——————




231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/libuv/tcp.rb', line 231

def bind(ip, port, callback = nil, **tls_options, &blk)
    return self if @closed

    @on_accept = callback || blk
    @on_listen = method(:accept)

    assert_type(String, ip, IP_ARGUMENT_ERROR)
    assert_type(Integer, port, PORT_ARGUMENT_ERROR)

    begin
        @tcp_socket = create_socket(IPAddr.new(ip), port)
        @tcp_socket.bind
        @tls_options.merge!(tls_options)
        @tls_options[:server] = true
    rescue Exception => e
        reject(e)
    end

    self
end

#closeObject

overwrite the default close to ensure pending writes are rejected



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/libuv/tcp.rb', line 148

def close
    return self if @closed

    # Free tls memory
    # Next tick as may recieve data after closing
    if @tls
        @reactor.next_tick do
            @tls.cleanup
        end
    end
    @connected = false

    if @pending_writes
        @pending_writes.each do |deferred, data|
            deferred.reject(TLS_ERROR)
        end
        @pending_writes = nil
    end

    super
end

#close_cbObject

Close can be called multiple times



123
124
125
126
127
128
129
130
131
# File 'lib/libuv/tcp.rb', line 123

def close_cb
    if @pending_write
        @pending_write.reject(TLS_ERROR)
        @pending_write = nil
    end

    # Shutdown the stream
    close
end

#connect(ip, port, callback = nil, &blk) ⇒ Object



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/libuv/tcp.rb', line 269

def connect(ip, port, callback = nil, &blk)
    return self if @closed

    @callback = callback || blk
    assert_type(String, ip, IP_ARGUMENT_ERROR)
    assert_type(Integer, port, PORT_ARGUMENT_ERROR)
    
    begin
        @tcp_socket = create_socket(IPAddr.new(ip), port)
        @tcp_socket.connect(callback(:on_connect, @tcp_socket.connect_req.address))
    rescue Exception => e
        reject(e)
    end

    if @callback.nil?
        @coroutine = @reactor.defer
        co @coroutine.promise
    end

    self
end

#direct_writeObject



176
# File 'lib/libuv/tcp.rb', line 176

alias_method :direct_write, :write

#disable_keepaliveObject



325
326
327
328
329
# File 'lib/libuv/tcp.rb', line 325

def disable_keepalive
    return self if @closed
    check_result ::Libuv::Ext.tcp_keepalive(handle, 0, 0)
    self
end

#disable_nodelayObject



313
314
315
316
317
# File 'lib/libuv/tcp.rb', line 313

def disable_nodelay
    return self if @closed
    check_result ::Libuv::Ext.tcp_nodelay(handle, 0)
    self
end

#disable_simultaneous_acceptsObject



337
338
339
340
341
# File 'lib/libuv/tcp.rb', line 337

def disable_simultaneous_accepts
    return self if @closed
    check_result ::Libuv::Ext.tcp_simultaneous_accepts(handle, 0)
    self
end

#dispatch_cb(data) ⇒ Object

This is clear text data that has been decrypted Same as stream.rb on_read for clear text



103
104
105
106
107
108
109
# File 'lib/libuv/tcp.rb', line 103

def dispatch_cb(data)
    begin
        @progress.call data, self
    rescue Exception => e
        @reactor.log e, 'performing TLS read data callback'
    end
end

#do_shutdownObject



199
# File 'lib/libuv/tcp.rb', line 199

alias_method :do_shutdown, :shutdown

#enable_keepalive(delay) ⇒ Object



319
320
321
322
323
# File 'lib/libuv/tcp.rb', line 319

def enable_keepalive(delay)
    return self if @closed                   # The to_i asserts integer
    check_result ::Libuv::Ext.tcp_keepalive(handle, 1, delay.to_i)
    self
end

#enable_nodelayObject



307
308
309
310
311
# File 'lib/libuv/tcp.rb', line 307

def enable_nodelay
    return self if @closed
    check_result ::Libuv::Ext.tcp_nodelay(handle, 1)
    self
end

#enable_simultaneous_acceptsObject



331
332
333
334
335
# File 'lib/libuv/tcp.rb', line 331

def enable_simultaneous_accepts
    return self if @closed
    check_result ::Libuv::Ext.tcp_simultaneous_accepts(handle, 1)
    self
end

#handshake_cb(protocol = nil) ⇒ Object

Push through any pending writes when handshake has completed



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/libuv/tcp.rb', line 77

def handshake_cb(protocol = nil)
    @handshake = true
    @protocol = protocol

    writes = @pending_writes
    @pending_writes = nil
    writes.each do |deferred, data|
        @pending_write = deferred
        @tls.encrypt(data)
    end

    begin
        @on_handshake.call(self, protocol) if @on_handshake
    rescue => e
        @reactor.log e, 'performing TLS handshake callback'
    end
end

#on_handshake(callback = nil, &blk) ⇒ Object

Provide a callback once the TLS handshake has completed



96
97
98
99
# File 'lib/libuv/tcp.rb', line 96

def on_handshake(callback = nil, &blk)
    @on_handshake = callback || blk
    self
end

#open(fd, binding = true, callback = nil, &blk) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/libuv/tcp.rb', line 252

def open(fd, binding = true, callback = nil, &blk)
    return self if @closed

    if binding
        @on_listen = method(:accept)
        @on_accept = callback || blk
    else
        @callback = callback || blk
        @coroutine = @reactor.defer if @callback.nil?
    end
    error = check_result ::Libuv::Ext.tcp_open(handle, fd)
    reject(error) if error
    co @coroutine.promise if @coroutine

    self
end

#peernameObject

The IP address of the peer (remote) end of the socket



300
301
302
303
304
305
# File 'lib/libuv/tcp.rb', line 300

def peername
    return [] if @closed
    sockaddr, len = get_sockaddr_and_len
    check_result! ::Libuv::Ext.tcp_getpeername(handle, sockaddr, len)
    get_ip_and_port(::Libuv::Ext::Sockaddr.new(sockaddr), len.get_int(0))
end

#remove_host(name) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/libuv/tcp.rb', line 214

def remove_host(name)
    if @tls_options[:hosts]
        found = nil
        @tls_options[:hosts].each do |host|
            if host[:host_name] == name
                found = host
                break
            end 
        end
        @tls_options[:hosts].delete(found) if found
    end
end

#shutdownObject



200
201
202
203
204
205
206
207
# File 'lib/libuv/tcp.rb', line 200

def shutdown
    if @pending_writes && @pending_writes.length > 0
        @pending_writes[-1][0].finally method(:do_shutdown)
    else
        do_shutdown
    end
    self
end

#socknameObject

The name of the client (local) end of the socket



292
293
294
295
296
297
# File 'lib/libuv/tcp.rb', line 292

def sockname
    return [] if @closed
    sockaddr, len = get_sockaddr_and_len
    check_result! ::Libuv::Ext.tcp_getsockname(handle, sockaddr, len)
    get_ip_and_port(::Libuv::Ext::Sockaddr.new(sockaddr), len.get_int(0))
end

#start_tls(args = {}) ⇒ Object

TLS Abstraction ———————-




53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/libuv/tcp.rb', line 53

def start_tls(args = {})
    return self unless @connected && @tls.nil?

    args[:verify_peer] = true if @on_verify

    @handshake = false
    @pending_writes = []
    @tls_options.merge!(args)

    hosts = @tls_options[:hosts]
    if hosts && hosts[0]
        opts = @tls_options.merge(hosts[0])
        @tls = ::RubyTls::SSL::Box.new(opts[:server], self, opts)
        hosts[1..-1].each do |host_opts|
            @tls.add_host(**host_opts)
        end
    else
        @tls = ::RubyTls::SSL::Box.new(@tls_options[:server], self, @tls_options)
    end
    @tls.start
    self
end

#tls?Boolean

Check if tls active on the socket

Returns:

  • (Boolean)


23
# File 'lib/libuv/tcp.rb', line 23

def tls?; !@tls.nil?; end

#transmit_cb(data) ⇒ Object

We resolve the existing tls write promise with a the

real writes promise (a close may have occurred)


113
114
115
116
117
118
119
120
# File 'lib/libuv/tcp.rb', line 113

def transmit_cb(data)
    if @pending_write
        @pending_write.resolve(direct_write(data))
        @pending_write = nil
    else
        direct_write(data)
    end
end

#verify_cb(cert) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/libuv/tcp.rb', line 133

def verify_cb(cert)
    if @on_verify
        begin
            return @on_verify.call cert
        rescue => e
            @reactor.log e, 'performing TLS verify callback'
            return false
        end
    end

    true
end

#verify_peer(callback = nil, &blk) ⇒ Object

Verify peers will be called for each cert in the chain



171
172
173
174
# File 'lib/libuv/tcp.rb', line 171

def verify_peer(callback = nil, &blk)
    @on_verify = callback || blk
    self
end

#write(data, wait: false) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/libuv/tcp.rb', line 177

def write(data, wait: false)
    if @tls
        deferred = @reactor.defer
        
        if @handshake
            @pending_write = deferred
            @tls.encrypt(data)
        else
            @pending_writes << [deferred, data]
        end

        if wait
            return deferred.promise if wait == :promise
            co deferred.promise
        end

        self
    else
        direct_write(data, wait: wait)
    end
end