Class: NATS::IO::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/io/client.rb

Overview

Implementation adapted from github.com/redis/redis-rb

Direct Known Subclasses

WebSocket

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Socket

Returns a new instance of Socket.



1873
1874
1875
1876
1877
1878
1879
1880
# File 'lib/nats/io/client.rb', line 1873

def initialize(options={})
  @uri = options[:uri]
  @connect_timeout = options[:connect_timeout]
  @write_timeout = options[:write_timeout]
  @read_timeout = options[:read_timeout]
  @socket = nil
  @tls = options[:tls]
end

Instance Attribute Details

#socketObject

Returns the value of attribute socket.



1871
1872
1873
# File 'lib/nats/io/client.rb', line 1871

def socket
  @socket
end

Instance Method Details

#closeObject



1979
1980
1981
# File 'lib/nats/io/client.rb', line 1979

def close
  @socket.close
end

#closed?Boolean

Returns:

  • (Boolean)


1983
1984
1985
# File 'lib/nats/io/client.rb', line 1983

def closed?
  @socket.closed?
end

#connectObject



1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
# File 'lib/nats/io/client.rb', line 1882

def connect
  addrinfo = ::Socket.getaddrinfo(@uri.hostname, nil, ::Socket::AF_UNSPEC, ::Socket::SOCK_STREAM)
  addrinfo.each_with_index do |ai, i|
    begin
      @socket = connect_addrinfo(ai, @uri.port, @connect_timeout)
      break
    rescue SystemCallError => e
      # Give up if no more available
      raise e if addrinfo.length == i+1
    end
  end

  # Set TCP no delay by default
  @socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
end

#read(max_bytes, deadline = nil) ⇒ Object



1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
# File 'lib/nats/io/client.rb', line 1922

def read(max_bytes, deadline=nil)

  begin
    return @socket.read_nonblock(max_bytes)
  rescue ::IO::WaitReadable
    if ::IO.select([@socket], nil, nil, deadline)
      retry
    else
      raise NATS::IO::SocketTimeoutError
    end
  rescue ::IO::WaitWritable
    if ::IO.select(nil, [@socket], nil, deadline)
      retry
    else
      raise NATS::IO::SocketTimeoutError
    end
  end
rescue EOFError => e
  if RUBY_ENGINE == 'jruby' and e.message == 'No message available'
    # FIXME: <EOFError: No message available> can happen in jruby
    # even though seems it is temporary and eventually possible
    # to read from socket.
    return nil
  end
  raise Errno::ECONNRESET
end

#read_line(deadline = nil) ⇒ Object



1914
1915
1916
1917
1918
1919
1920
# File 'lib/nats/io/client.rb', line 1914

def read_line(deadline=nil)
  # FIXME: Should accumulate and read in a non blocking way instead
  unless ::IO.select([@socket], nil, nil, deadline)
    raise NATS::IO::SocketTimeoutError
  end
  @socket.gets
end

#setup_tls!Object

(Re-)connect using secure connection if server and client agreed on using it.



1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
# File 'lib/nats/io/client.rb', line 1899

def setup_tls!
  # Setup TLS connection by rewrapping the socket
  tls_socket = OpenSSL::SSL::SSLSocket.new(@socket, @tls.fetch(:context))

  # Close TCP socket after closing TLS socket as well.
  tls_socket.sync_close = true

  # Required to enable hostname verification if Ruby runtime supports it (>= 2.4):
  # https://github.com/ruby/openssl/commit/028e495734e9e6aa5dba1a2e130b08f66cf31a21
  tls_socket.hostname = @tls[:hostname]

  tls_socket.connect
  @socket = tls_socket
end

#write(data, deadline = nil) ⇒ Object



1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
# File 'lib/nats/io/client.rb', line 1949

def write(data, deadline=nil)
  length = data.bytesize
  total_written = 0

  loop do
    begin
      written = @socket.write_nonblock(data)

      total_written += written
      break total_written if total_written >= length
      data = data.byteslice(written..-1)
    rescue ::IO::WaitWritable
      if ::IO.select(nil, [@socket], nil, deadline)
        retry
      else
        raise NATS::IO::SocketTimeoutError
      end
    rescue ::IO::WaitReadable
      if ::IO.select([@socket], nil, nil, deadline)
        retry
      else
        raise NATS::IO::SocketTimeoutError
      end
    end
  end

rescue EOFError
  raise Errno::ECONNRESET
end