Class: Raktr::Connection

Inherits:
Object
  • Object
show all
Includes:
Callbacks
Defined in:
lib/raktr/connection.rb,
lib/raktr/connection/tls.rb,
lib/raktr/connection/callbacks.rb,
lib/raktr/connection/peer_info.rb

Overview

Author:

Defined Under Namespace

Modules: Callbacks, PeerInfo, TLS

Constant Summary collapse

BLOCK_SIZE =

Maximum amount of data to be written or read at a time.

We set this to the same max block size as the OpenSSL buffers because more than this tends to cause SSL errors and broken #select behavior – 1024 * 16 at the time of writing.

OpenSSL::Buffering::BLOCK_SIZE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Callbacks

#on_attach, #on_close, #on_connect, #on_detach, #on_flush, #on_read, #on_write

Instance Attribute Details

#raktrReactor

Returns Reactor associated with this connection.

Returns:

  • (Reactor)

    Reactor associated with this connection.



32
33
34
# File 'lib/raktr/connection.rb', line 32

def raktr
  @raktr
end

#roleSymbol (readonly)

Returns ‘:client` or `:server`.

Returns:

  • (Symbol)

    ‘:client` or `:server`



36
37
38
# File 'lib/raktr/connection.rb', line 36

def role
  @role
end

#socketSocket (readonly)

Returns Ruby ‘Socket` associated with this connection.

Returns:

  • (Socket)

    Ruby ‘Socket` associated with this connection.



28
29
30
# File 'lib/raktr/connection.rb', line 28

def socket
  @socket
end

Instance Method Details

#_connectObject



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/raktr/connection.rb', line 227

def _connect
    return true if unix? || connected?

    begin
        socket.connect_nonblock( Socket.sockaddr_in( @port, @host ) )
    # Already connected. :)
    rescue Errno::EISCONN, Errno::EALREADY
    end

    @connected = true
    on_connect

    true
rescue IO::WaitReadable, IO::WaitWritable, Errno::EINPROGRESS
rescue => e
    close e
end

#_readObject

Note:

If this is a server #listener? it will delegate to #accept.

Note:

If this is a normal socket it will read BLOCK_SIZE amount of data. and pass it to Raktr::Connection::Callbacks#on_read.

Processes a ‘read` event for this connection.



252
253
254
255
256
257
258
259
260
261
262
# File 'lib/raktr/connection.rb', line 252

def _read
    return _connect if !listener? && !connected?
    return accept   if listener?

    on_read @socket.read_nonblock( BLOCK_SIZE )

# Not ready to read or write yet, we'll catch it on future Reactor ticks.
rescue IO::WaitReadable, IO::WaitWritable
rescue => e
    close e
end

#_writeInteger

Note:

Will call Raktr::Connection::Callbacks#on_write every time any of the buffer is consumed, can be multiple times when performing partial writes.

Note:

Will call Raktr::Connection::Callbacks#on_flush once all of the buffer has been consumed.

Processes a ‘write` event for this connection.

Consumes and writes BLOCK_SIZE amount of data from the the beginning of the #write buffer to the socket.

Returns:

  • (Integer)

    Amount of the buffer consumed.



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/raktr/connection.rb', line 277

def _write
    return _connect if !connected?

    write_buffer.force_encoding( Encoding::BINARY )
    chunk = write_buffer[0, BLOCK_SIZE]
    total_written = 0

    begin
        # Send out the chunk, **all** of it, or at least try to.
        loop do
            total_written += written = @socket.write_nonblock( chunk )
            write_buffer[0, written] = ''

            # Call #on_write every time any of the buffer is consumed.
            on_write

            break if written == chunk.size
            chunk[0, written] = ''
        end

    # Not ready to read or write yet, we'll catch it on future Reactor ticks.
    rescue IO::WaitReadable, IO::WaitWritable
    end

    if write_buffer.empty?
        @socket.flush
        on_flush
    end

    total_written
rescue => e
    close e
end

#acceptConnection?

Accepts a new client connection.

Returns:

  • (Connection, nil)

    New connection or ‘nil` if the socket isn’t ready to accept new connections yet.



189
190
191
192
193
194
195
196
# File 'lib/raktr/connection.rb', line 189

def accept
    return if !(accepted = socket_accept)

    connection = @server_handler.call
    connection.configure socket: accepted, role: :server
    @raktr.attach connection
    connection
end

#attach(raktr) ⇒ Bool

Note:

Will first detach if already #attached?.

Note:

Sets #raktr.

Returns ‘true` if the connection was attached, `nil` if the connection was already attached.

Parameters:

  • reactor (Reactor)

    Reactor to which to attach Reactor#attach.

Returns:

  • (Bool)

    ‘true` if the connection was attached, `nil` if the connection was already attached.



109
110
111
112
113
114
115
116
# File 'lib/raktr/connection.rb', line 109

def attach( raktr )
    return if raktr.attached?( self )
    detach if attached?

    raktr.attach self

    true
end

#attached?Bool

Returns ‘true` if the connection is Reactor#attached? to a #raktr, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the connection is Reactor#attached? to a #raktr, `false` otherwise.



88
89
90
# File 'lib/raktr/connection.rb', line 88

def attached?
    @raktr && @raktr.attached?( self )
end

#close(reason = nil) ⇒ Object

Note:

Will call Raktr::Connection::Callbacks#on_close right before closing the socket and detaching from the Reactor.

Closes the connection and detaches it from the Reactor.

Parameters:

  • reason (Exception) (defaults to: nil)

    Reason for the close.



174
175
176
177
178
179
180
# File 'lib/raktr/connection.rb', line 174

def close( reason = nil )
    return if closed?

    on_close reason
    close_without_callback
    nil
end

#close_without_callbackObject

Closes the connection and detaches it from the Reactor.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/raktr/connection.rb', line 137

def close_without_callback
    return if closed?
    @closed = true

    if listener? && unix? && (path = to_io.path) && File.exist?( path )
        File.delete( path )
    end

    if @socket
        @socket.close rescue nil
    end

    detach

    nil
end

#closed?Bool

Returns ‘true` if the connection has been closed, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the connection has been closed, `false` otherwise.



156
157
158
# File 'lib/raktr/connection.rb', line 156

def closed?
    !!@closed
end

#configure(options = {}) ⇒ Object

Parameters:

  • socket (Socket)

    Ruby ‘Socket` associated with this connection.

  • role (Symbol)

    ‘:server` or `:client`.

  • server_handler (Block)

    Block that generates a handler as specified in Reactor#listen.



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/raktr/connection.rb', line 206

def configure( options = {} )
    @socket         = options[:socket]
    @role           = options[:role]
    @host           = options[:host]
    @port           = options[:port]
    @server_handler = options[:server_handler]

    # If we're a server without a handler then we're an accepted connection.
    if unix? || role == :server
        @connected = true
        on_connect
    end

    nil
end

#connected?Boolean

Returns:

  • (Boolean)


222
223
224
# File 'lib/raktr/connection.rb', line 222

def connected?
    !!@connected
end

#detachBool

Note:

Removes #raktr.

Detaches ‘self` from the #raktr.

Returns:

  • (Bool)

    ‘true` if the connection was detached, `nil` if the connection was already detached.



126
127
128
129
130
131
132
# File 'lib/raktr/connection.rb', line 126

def detach
    return if detached?

    @raktr.detach self

    true
end

#detached?Bool

Returns ‘true` if the connection is not Reactor#attached? to a #raktr, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the connection is not Reactor#attached? to a #raktr, `false` otherwise.



95
96
97
# File 'lib/raktr/connection.rb', line 95

def detached?
    !attached?
end

#has_outgoing_data?Bool

Returns ‘true` if the connection has outgoing data that have not yet been written, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the connection has outgoing data that have not yet been written, `false` otherwise.



163
164
165
# File 'lib/raktr/connection.rb', line 163

def has_outgoing_data?
    !write_buffer.empty?
end

#inet?Bool

Returns ‘true` when using an Internet socket, `nil` if no #socket is available, `false` otherwise.

Returns:

  • (Bool)

    ‘true` when using an Internet socket, `nil` if no #socket is available, `false` otherwise.



52
53
54
55
56
57
# File 'lib/raktr/connection.rb', line 52

def inet?
    return @is_inet if !@is_inet.nil?
    return if !to_io

    @is_inet = to_io.is_a?( TCPServer ) || to_io.is_a?( TCPSocket ) || to_io.is_a?( Socket )
end

#listener?Bool

Returns ‘true` if the connection is a server listener.

Returns:

  • (Bool)

    ‘true` if the connection is a server listener.



68
69
70
71
72
73
# File 'lib/raktr/connection.rb', line 68

def listener?
    return @is_listener if !@is_listener.nil?
    return if !to_io

    @is_listener = to_io.is_a?( TCPServer ) || (unix? && to_io.is_a?( UNIXServer ))
end

#to_ioIO?

Returns IO stream or ‘nil` if no #socket is available.

Returns:

  • (IO, nil)

    IO stream or ‘nil` if no #socket is available.



61
62
63
64
# File 'lib/raktr/connection.rb', line 61

def to_io
    return if !@socket
    @socket.to_io
end

#unix?Bool?

Returns ‘true` when using a UNIX-domain socket, `nil` if no #socket is available, `false` otherwise.

Returns:

  • (Bool, nil)

    ‘true` when using a UNIX-domain socket, `nil` if no #socket is available, `false` otherwise.



41
42
43
44
45
46
47
# File 'lib/raktr/connection.rb', line 41

def unix?
    return @is_unix if !@is_unix.nil?
    return if !to_io
    return false if !Raktr.supports_unix_sockets?

    @is_unix = to_io.is_a?( UNIXServer ) || to_io.is_a?( UNIXSocket )
end

#write(data) ⇒ Object

Note:

The data will be buffered and sent in future Reactor ticks.

Parameters:

  • data (String)

    Data to send to the peer.



79
80
81
82
83
# File 'lib/raktr/connection.rb', line 79

def write( data )
    @raktr.schedule do
        write_buffer << data
    end
end