Class: Oxblood::RSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/oxblood/rsocket.rb

Overview

Thin socket wrapper made with resilience. Socket will be closed and automatically recreated in case of any errors (including timeout errors) in order to avoid inconsistent state.

Constant Summary collapse

TimeoutError =
Class.new(RuntimeError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ RSocket

Maintain socket

Options Hash (opts):

  • :timeout (Float) — default: 1.0

    socket read/write timeout

  • :host (String) — default: 'localhost'

    Hostname or IP address to connect to

  • :port (Integer) — default: 6379

    Port Redis server listens on

  • :connect_timeout (Float) — default: 1.0

    socket connect timeout

  • :path (String)

    UNIX socket path


36
37
38
39
40
41
# File 'lib/oxblood/rsocket.rb', line 36

def initialize(opts = {})
  @opts = opts
  @timeout = opts.fetch(:timeout, 1.0)
  @socket = create_socket(opts)
  @buffer = String.new.encode!('ASCII-8BIT')
end

Instance Attribute Details

#timeoutNumeric


23
24
25
# File 'lib/oxblood/rsocket.rb', line 23

def timeout
  @timeout
end

Instance Method Details

#closenil

Close connection to server


88
89
90
91
92
93
94
95
# File 'lib/oxblood/rsocket.rb', line 88

def close
  @buffer.clear
  @socket && @socket.close
rescue IOError
  ;
ensure
  @socket = nil
end

#connected?Boolean

True if socket exists


99
100
101
# File 'lib/oxblood/rsocket.rb', line 99

def connected?
  !!@socket
end

#gets(separator, timeout = @timeout) ⇒ String

Read until separator


59
60
61
62
63
64
65
# File 'lib/oxblood/rsocket.rb', line 59

def gets(separator, timeout = @timeout)
  while (crlf = @buffer.index(separator)).nil?
    @buffer << readpartial(1024, timeout)
  end

  @buffer.slice!(0, crlf + separator.bytesize)
end

#read(nbytes, timeout = @timeout) ⇒ String

Read number of bytes


46
47
48
49
50
51
52
53
54
# File 'lib/oxblood/rsocket.rb', line 46

def read(nbytes, timeout = @timeout)
  result = @buffer.slice!(0, nbytes)

  while result.bytesize < nbytes
    result << readpartial(nbytes - result.bytesize, timeout)
  end

  result
end

#write(data, timeout = @timeout) ⇒ Integer

Write data to socket


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/oxblood/rsocket.rb', line 70

def write(data, timeout = @timeout)
  full_size = data.bytesize

  while data.bytesize > 0
    written = socket.write_nonblock(data, exception: false)

    if written == :wait_writable
      socket.wait_writable(timeout) or fail_with_timeout!
    else
      data = data.byteslice(written..-1)
    end
  end

  full_size
end