Class: Riemann::Client::TcpSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/riemann/client/tcp_socket.rb

Overview

Socket: A specialized socket that has been configure

Direct Known Subclasses

SSLSocket

Defined Under Namespace

Classes: Error, Timeout

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ TcpSocket

Internal: Creates a new KJess::Socket



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/riemann/client/tcp_socket.rb', line 85

def initialize(options = {})
  @host = options[:host]
  @port = options[:port]

  @connect_timeout = options[:connect_timeout] || options[:timeout] || 2
  @read_timeout    = options[:read_timeout]    || options[:timeout] || 2
  @write_timeout   = options[:write_timeout]   || options[:timeout] || 2

  @keepalive_active   = options.fetch(:keepalive_active, true)
  @keepalive_idle     = options[:keepalive_idle]     || 60
  @keepalive_interval = options[:keepalive_interval] || 30
  @keepalive_count    = options[:keepalive_count]    || 5

  @socket             = nil
end

Instance Attribute Details

#connect_timeoutObject (readonly)

Internal: The timeout for connecting in seconds. Defaults to 2



19
20
21
# File 'lib/riemann/client/tcp_socket.rb', line 19

def connect_timeout
  @connect_timeout
end

#hostObject (readonly)

Internal: The host this socket is connected to



27
28
29
# File 'lib/riemann/client/tcp_socket.rb', line 27

def host
  @host
end

#keepalive_countObject (readonly)

Internal

Used for setting TCP_KEEPCNT: overrides tcp_keepalive_probes for a single socket.

tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html

tcp_keepalive_probes:

The number of unacknowledged probes to send before considering the
connection dead and notifying the application layer.


71
72
73
# File 'lib/riemann/client/tcp_socket.rb', line 71

def keepalive_count
  @keepalive_count
end

#keepalive_idleObject (readonly)

Internal

Used for setting TCP_KEEPIDLE: overrides tcp_keepalive_time for a single socket.

tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html

tcp_keepalive_time:

The interval between the last data packet sent (simple ACKs are not
considered data) and the first keepalive probe; after the connection is
marked to need keepalive, this counter is not used any further.


45
46
47
# File 'lib/riemann/client/tcp_socket.rb', line 45

def keepalive_idle
  @keepalive_idle
end

#keepalive_intervalObject (readonly)

Internal

Used for setting TCP_KEEPINTVL: overrides tcp_keepalive_intvl for a single socket.

tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html

tcp_keepalive_intvl:

The interval between subsequential keepalive probes, regardless of what
the connection has exchanged in the meantime.


58
59
60
# File 'lib/riemann/client/tcp_socket.rb', line 58

def keepalive_interval
  @keepalive_interval
end

#portObject (readonly)

Internal: The port this socket is connected to



31
32
33
# File 'lib/riemann/client/tcp_socket.rb', line 31

def port
  @port
end

#read_timeoutObject

Internal: The timeout for reading in seconds. Defaults to 2



15
16
17
# File 'lib/riemann/client/tcp_socket.rb', line 15

def read_timeout
  @read_timeout
end

#write_timeoutObject (readonly)

Internal: The timeout for writing in seconds. Defaults to 2



23
24
25
# File 'lib/riemann/client/tcp_socket.rb', line 23

def write_timeout
  @write_timeout
end

Class Method Details

.connect(options = {}) ⇒ Object

Internal: Create and connect to the given location.

options, same as Constructor

Returns an instance of KJess::Socket



78
79
80
81
82
# File 'lib/riemann/client/tcp_socket.rb', line 78

def self.connect(options = {})
  s = new(options)
  s.connect
  s
end

Instance Method Details

#closeObject

Internal: Closes the internal ::Socket

Returns nothing



150
151
152
153
# File 'lib/riemann/client/tcp_socket.rb', line 150

def close
  @socket.close unless closed?
  @socket = nil
end

#closed?Boolean

Internal: Return true the socket is closed.

Returns:

  • (Boolean)


156
157
158
159
160
161
# File 'lib/riemann/client/tcp_socket.rb', line 156

def closed?
  return true if @socket.nil?
  return true if @socket.closed?

  false
end

#connectObject

Internal:

Connect to the remote host in a non-blocking fashion.

Raise Error if there is a failure connecting.

Return the ::Socket on success



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/riemann/client/tcp_socket.rb', line 170

def connect
  # Calculate our timeout deadline
  deadline = Time.now.to_f + connect_timeout

  # Lookup destination address, we only want TCP.
  addrs      = ::Socket.getaddrinfo(host, port, nil, ::Socket::SOCK_STREAM)
  errors     = []
  conn_error = -> { raise errors.first }
  sock       = nil

  # Sort it so we get AF_INET, IPv4
  addrs.sort.find(conn_error) do |addr|
    sock = connect_or_error(addr, deadline, errors)
  end
  sock
end

#connect_nonblock(addr, timeout) ⇒ Object

Internal: Connect to the give address within the timeout.

Make an attempt to connect to a single address within the given timeout.

Return the ::Socket when it is connected, or raise an Error if no connection was possible.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/riemann/client/tcp_socket.rb', line 217

def connect_nonblock(addr, timeout)
  sockaddr = ::Socket.pack_sockaddr_in(addr[1], addr[3])
  sock     = socket_factory(addr[4])
  sock.connect_nonblock(sockaddr)
  sock
rescue Errno::EINPROGRESS
  if IO.select(nil, [sock], nil, timeout).nil?
    begin
      sock.close
    rescue StandardError
      nil
    end
    raise Timeout, "Could not connect to #{host}:#{port} within #{timeout} seconds"
  end
  connect_nonblock_finalize(sock, sockaddr)
rescue StandardError => e
  begin
    sock.close
  rescue StandardError
    nil
  end
  raise Error, "Could not connect to #{host}:#{port}: #{e.class}: #{e.message}", e.backtrace
end

#connect_nonblock_finalize(sock, sockaddr) ⇒ Object

Internal: Make sure that a non-blocking connect has truely connected.

Ensure that the given socket is actually connected to the given adddress.

Returning the socket if it is and raising an Error if it isn’t.



246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/riemann/client/tcp_socket.rb', line 246

def connect_nonblock_finalize(sock, sockaddr)
  sock.connect_nonblock(sockaddr)
  sock
rescue Errno::EISCONN
  sock
rescue StandardError => e
  begin
    sock.close
  rescue StandardError
    nil
  end
  raise Error, "Could not connect to #{host}:#{port}: #{e.class}: #{e.message}", e.backtrace
end

#connect_or_error(addr, deadline, errors) ⇒ Object

Internal: Connect to the destination or raise an error.

Connect to the address or capture the error of the connection

addr - An address returned from Socket.getaddrinfo() deadline - the after which we should raise a timeout error errors - a collection of errors to append an error too should we have one.

Make an attempt to connect to the given address. If it is successful, return the socket.

Should the connection fail, append the exception to the errors array and return false.



201
202
203
204
205
206
207
208
209
# File 'lib/riemann/client/tcp_socket.rb', line 201

def connect_or_error(addr, deadline, errors)
  timeout = deadline - Time.now.to_f
  raise Timeout, "Could not connect to #{host}:#{port}" if timeout <= 0

  connect_nonblock(addr, timeout)
rescue Error => e
  errors << e
  false
end

#keepalive_active?Boolean

Internal: Return whether or not the keepalive_active flag is set.

Returns:

  • (Boolean)


102
103
104
# File 'lib/riemann/client/tcp_socket.rb', line 102

def keepalive_active?
  @keepalive_active
end

#read(length, outbuf = nil) ⇒ Object

Reads length bytes from the socket

length - the number of bytes to read from the socket outbuf - an optional buffer to store the bytes in

Returns the bytes read if no outbuf is specified



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/riemann/client/tcp_socket.rb', line 287

def read(length, outbuf = nil)
  if outbuf
    outbuf.replace('')
    buf = outbuf
  else
    buf = String.new
  end

  while buf.length < length
    unless (rb = readpartial(length - buf.length))
      break
    end

    buf << rb
  end

  buf
end

#readpartial(maxlen, outbuf = nil) ⇒ Object

Internal: Read up to a maxlen of data from the socket and store it in outbuf

maxlen - the maximum number of bytes to read from the socket outbuf - the buffer in which to store the bytes.

Returns the bytes read



312
313
314
315
316
317
318
319
320
# File 'lib/riemann/client/tcp_socket.rb', line 312

def readpartial(maxlen, outbuf = nil)
  socket.read_nonblock(maxlen, outbuf)
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::ECONNRESET, IO::WaitReadable
  unless wait_readable(read_timeout)
    raise Timeout, "Could not read from #{host}:#{port} in #{read_timeout} seconds"
  end

  retry
end

#socketObject

Internal: Return the connected raw Socket.

If the socket is closed or non-existent it will create and connect again.

Returns a ::Socket



141
142
143
144
145
# File 'lib/riemann/client/tcp_socket.rb', line 141

def socket
  return @socket unless closed?

  @socket ||= connect
end

#socket_factory(type) ⇒ Object

Internal: Low level socket allocation and option configuration

Using the options from the initializer, a new ::Socket is created that is:

TCP, autoclosing on exit, nagle's algorithm is disabled and has
TCP Keepalive options set if keepalive is supported.

Returns a new ::Socket instance for



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/riemann/client/tcp_socket.rb', line 116

def socket_factory(type)
  sock = ::Socket.new(type, ::Socket::SOCK_STREAM, 0)

  # close file descriptors if we exec
  if Fcntl.constants.include?(:F_SETFD) && Fcntl.constants.include?(:FD_CLOEXEC)
    sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
  end
  # Disable Nagle's algorithm
  sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)

  if using_keepalive?
    sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true)
    sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPIDLE, keepalive_idle)
    sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPINTVL, keepalive_interval)
    sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPCNT, keepalive_count)
  end

  sock
end

#using_keepalive?Boolean

Internal: say if we are using TCP Keep Alive or not

We will return true if the initialization options :keepalive_active is set to true, and if all the constants that are necessary to use TCP keep alive are defined.

It may be the case that on some operating systems that the constants are not defined, so in that case we do not want to attempt to use tcp keep alive if we are unable to do so in any case.

Returns true or false

Returns:

  • (Boolean)


271
272
273
274
275
276
277
278
279
# File 'lib/riemann/client/tcp_socket.rb', line 271

def using_keepalive?
  using = false
  if keepalive_active?
    using = %i[SOL_SOCKET SO_KEEPALIVE SOL_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT].all? do |c|
      ::Socket.const_defined? c
    end
  end
  using
end

#wait_readable(timeout = nil) ⇒ Object



355
356
357
# File 'lib/riemann/client/tcp_socket.rb', line 355

def wait_readable(timeout = nil)
  IO.select([@socket], nil, nil, timeout || read_timeout)
end

#wait_writable(timeout = nil) ⇒ Object



351
352
353
# File 'lib/riemann/client/tcp_socket.rb', line 351

def wait_writable(timeout = nil)
  IO.select(nil, [@socket], nil, timeout || write_timeout)
end

#write(buf) ⇒ Object

Internal: Write the given data to the socket

buf - the data to write to the socket.

Raises an error if it is unable to write the data to the socket within the write_timeout.

returns nothing



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/riemann/client/tcp_socket.rb', line 330

def write(buf)
  until buf.nil? || buf.empty?
    written = socket.write_nonblock(buf)
    buf = buf[written, buf.length]
  end
rescue Errno::EWOULDBLOCK, Errno::EINTR, Errno::EAGAIN, Errno::ECONNRESET, IO::WaitWritable
  unless wait_writable(write_timeout)
    raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds"
  end

  retry
rescue IO::WaitReadable
  # Also rescued for SSL renegotiation in OpenSSL::SSL::SSLSocket according to
  # https://ruby-doc.org/core-2.7.1/IO.html#method-c-select
  unless wait_readable(read_timeout)
    raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds"
  end

  retry
end