Class: FTW::Connection

Inherits:
Object
  • Object
show all
Includes:
Cabin::Inspectable, Poolable
Defined in:
lib/ftw/connection.rb

Overview

A network connection. This is TCP.

You can use IO::select on this objects of this type. (at least, in MRI you can)

You can activate SSL/TLS on this connection by invoking FTW::Connection#secure

This class also implements buffering itself because some IO-like classes (OpenSSL::SSL::SSLSocket) do not support IO#ungetbyte

Defined Under Namespace

Classes: ConnectRefused, ConnectTimeout, ReadTimeout, SecureHandshakeTimeout, WriteTimeout

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Poolable

#available?, #mark, #release

Class Method Details

.from_io(io, mode = :server) ⇒ Object

Create a new connection from an existing IO instance (like a socket)

Valid modes are :server and :client.

  • specify :server if this connection is from a server (via Socket#accept)

  • specify :client if this connection is from a client (via Socket#connect)



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/ftw/connection.rb', line 97

def self.from_io(io, mode=:server)
  valid_modes = [:server, :client]
  if !valid_modes.include?(mode)
    raise InvalidArgument.new("Invalid connection mode '#{mode}'. Valid modes: #{valid_modes.inspect}")
  end

  connection = self.new(nil) # New connection with no destinations
  connection.instance_eval do
    @socket = io
    @connected = true
    port, address = Socket.unpack_sockaddr_in(io.getpeername)
    @remote_address = "#{address}:#{port}"
    @mode = mode
  end
  return connection
end

Instance Method Details

#client?Boolean

Is this a client connection?

Returns:

  • (Boolean)


421
422
423
# File 'lib/ftw/connection.rb', line 421

def client?
  return @mode == :client
end

#connect(timeout = nil) ⇒ nil, StandardError or subclass

Connect now.

Timeout value is optional. If no timeout is given, this method blocks until a connection is successful or an error occurs.

You should check the return value of this method to determine if a connection was successful.

Possible return values are on error include:

  • FTW::Connection::ConnectRefused

  • FTW::Connection::ConnectTimeout

Returns:

  • (nil)

    if the connection was successful

  • (StandardError or subclass)

    if the connection failed



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/ftw/connection.rb', line 129

def connect(timeout=nil)
  # TODO(sissel): Raise if we're already connected?
  disconnect("reconnecting") if connected?
  host, port = @destinations.first.split(":")
  @destinations = @destinations.rotate # round-robin

  # Do dns resolution on the host. If there are multiple
  # addresses resolved, return one at random.
  addresses = FTW::DNS.singleton.resolve(host)

  addresses.each do |address|
    # Try each address until one works.
    @remote_address = address
    # Addresses with colon ':' in them are assumed to be IPv6
    family = @remote_address.include?(":") ? Socket::AF_INET6 : Socket::AF_INET
    @logger.debug("Connecting", :address => @remote_address,
                  :host => host, :port => port, :family => family)
    @socket = Socket.new(family, Socket::SOCK_STREAM, 0)
    @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

    # This api is terrible. pack_sockaddr_in? This isn't C, man...
    @logger.debug("packing", :data => [port.to_i, @remote_address])
    sockaddr = Socket.pack_sockaddr_in(port.to_i, @remote_address)
    # TODO(sissel): Support local address binding

    # Connect with timeout
    begin
      @socket.connect_nonblock(sockaddr)
    rescue IO::WaitWritable, Errno::EINPROGRESS
      # Ruby actually raises Errno::EINPROGRESS, but for some reason
      # the documentation says to use this IO::WaitWritable thing...
      # I don't get it, but whatever :(

      writable = writable?(timeout)

      # http://jira.codehaus.org/browse/JRUBY-6528; IO.select doesn't behave
      # correctly on JRuby < 1.7, so work around it.
      if writable || (RUBY_PLATFORM == "java" and JRUBY_VERSION < "1.7.0")
        begin
          @socket.connect_nonblock(sockaddr) # check connection failure
        rescue Errno::EISCONN 
          # Ignore, we're already connected.
        rescue Errno::ECONNREFUSED => e
          # Fire 'disconnected' event with reason :refused
          @socket.close
          return ConnectRefused.new("#{host}[#{@remote_address}]:#{port}")
        rescue Errno::ETIMEDOUT
          # This occurs when the system's TCP timeout hits, we have no
          # control over this, as far as I can tell. *maybe* setsockopt(2)
          # has a flag for this, but I haven't checked..
          # TODO(sissel): We should instead do 'retry' unless we've exceeded
          # the timeout.
          @socket.close
          return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
        rescue Errno::EINPROGRESS
          # If we get here, it's likely JRuby version < 1.7.0. EINPROGRESS at
          # this point in the code means that we have timed out.
          @socket.close
          return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
        end
      else
        # Connection timeout;
        return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
      end

      # If no error at this point, we're now connected.
      @connected = true
      break
    end # addresses.each
  end 
  return nil
end

#connected?Boolean

Is this Connection connected?

Returns:

  • (Boolean)


203
204
205
# File 'lib/ftw/connection.rb', line 203

def connected?
  return @connected
end

#disconnect(reason) ⇒ Object

End this connection, specifying why.



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/ftw/connection.rb', line 266

def disconnect(reason)
  if @socket.is_a?(OpenSSL::SSL::SSLSocket)
    @socket.sysclose()
  else
    begin 
      @socket.close_read
    rescue IOError => e
      # Ignore, perhaps we shouldn't ignore.
    end

    begin 
      @socket.close_write
    rescue IOError => e
      # Ignore, perhaps we shouldn't ignore.
    end
  end
end

#peerObject

The host:port



303
304
305
# File 'lib/ftw/connection.rb', line 303

def peer
  return @remote_address
end

#pushback(data) ⇒ Object

Push back some data onto the connection’s read buffer.



261
262
263
# File 'lib/ftw/connection.rb', line 261

def pushback(data)
  @pushback_buffer << data
end

#read(length = 16384, timeout = nil) ⇒ Object

Read data from this connection This method blocks until the read succeeds unless a timeout is given.

This method is not guaranteed to read exactly ‘length’ bytes. See IO#sysread



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/ftw/connection.rb', line 227

def read(length=16384, timeout=nil)
  data = ""
  data.force_encoding("BINARY") if data.respond_to?(:force_encoding)
  have_pushback = !@pushback_buffer.empty?
  if have_pushback
    data << @pushback_buffer
    @pushback_buffer = ""
    # We have data 'now' so don't wait.
    timeout = 0
  end

  if readable?(timeout)
    begin
      # Read at most 'length' data, so read less from the socket
      # We'll read less than 'length' if the pushback buffer has
      # data in it already.
      @socket.sysread(length - data.length, @read_buffer)
      data << @read_buffer
      return data
    rescue EOFError => e
      @socket.close
      @connected = false
      raise e
    end
  else
    if have_pushback
      return data
    else
      raise ReadTimeout.new
    end
  end
end

#readable?(timeout) ⇒ Boolean

Is this connection readable? Returns true if it is readable within the timeout period. False otherwise.

The time out is in seconds. Fractional seconds are OK.

Returns:

  • (Boolean)


297
298
299
300
# File 'lib/ftw/connection.rb', line 297

def readable?(timeout)
  readable, writable, errors = IO.select([@socket], nil, nil, timeout)
  return !readable.nil?
end

#secure(options = nil) ⇒ Object

Secure this connection with TLS.

Options:

  • :certificate_store, an OpenSSL::X509::Store

  • :timeout, a timeout threshold in seconds.



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/ftw/connection.rb', line 318

def secure(options=nil)
  # Skip this if we're already secure.
  return if secured?

  defaults = {
    :timeout => nil,
    #:certificate_store => OpenSSL::SSL::SSLContext::DEFAULT_CERT_STORE
  }
  settings = defaults.merge(options) unless options.nil?

  @logger.info("Securing this connection", :peer => peer, :options => settings)
  # Wrap this connection with TLS/SSL
  sslcontext = OpenSSL::SSL::SSLContext.new
  # If you use VERIFY_NONE, you are removing the trust feature of TLS. Don't do that.
  # Encryption without trust means you don't know who you are talking to.
  sslcontext.verify_mode = OpenSSL::SSL::VERIFY_PEER

  # ruby-core is refusing to patch ruby's default openssl settings to be more
  # secure, so let's fix that here. The next few lines setting options and
  # ciphers come from jmhodges proposed patch
  ssloptions = OpenSSL::SSL::OP_ALL
  if defined?(OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS)
    ssloptions &= ~OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS
  end
  if defined?(OpenSSL::SSL::OP_NO_COMPRESSION)
    ssloptions |= OpenSSL::SSL::OP_NO_COMPRESSION
  end
  sslcontext.options = ssloptions
  sslcontext.ciphers = "DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2"

  sslcontext.verify_callback = proc do |*args| 
    @logger.debug("Verify peer via FTW::Connection#secure", :callback => settings[:verify_callback])
    if settings[:verify_callback].respond_to?(:call)
      settings[:verify_callback].call(*args)
    end
  end
  sslcontext.cert_store = options[:certificate_store]
  @socket = OpenSSL::SSL::SSLSocket.new(@socket, sslcontext)

  # TODO(sissel): Set up local certificat/key stuff. This is required for
  # server-side ssl operation, I think.

  if client?
    do_secure(:connect_nonblock, options[:timeout])
  else
    do_secure(:accept_nonblock, options[:timeout])
  end
end

#secured?Boolean

Has this connection been secured?

Returns:

  • (Boolean)


416
417
418
# File 'lib/ftw/connection.rb', line 416

def secured?
  return @secure
end

#server?Boolean

Is this a server connection?

Returns:

  • (Boolean)


426
427
428
# File 'lib/ftw/connection.rb', line 426

def server?
  return @mode == :server
end

#to_ioObject

Support ‘to_io’ so you can use IO::select on this object.



308
309
310
# File 'lib/ftw/connection.rb', line 308

def to_io
  return @socket
end

#writable?(timeout) ⇒ Boolean

Is this connection writable? Returns true if it is writable within the timeout period. False otherwise.

The time out is in seconds. Fractional seconds are OK.

Returns:

  • (Boolean)


288
289
290
291
# File 'lib/ftw/connection.rb', line 288

def writable?(timeout)
  readable, writable, errors = IO.select(nil, [@socket], nil, timeout)
  return !writable.nil?
end

#write(data, timeout = nil) ⇒ Object

Write data to this connection. This method blocks until the write succeeds unless a timeout is given.

This method is not guaranteed to have written the full data given.

Returns the number of bytes written (See also IO#syswrite)



213
214
215
216
217
218
219
220
# File 'lib/ftw/connection.rb', line 213

def write(data, timeout=nil)
  #connect if !connected?
  if writable?(timeout)
    return @socket.syswrite(data)
  else
    raise FTW::Connection::WriteTimeout.new(self.inspect)
  end
end