Class: NATS::IO::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/io/client.rb

Overview

Implementation adapted from github.com/redis/redis-rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Socket

Returns a new instance of Socket.



1816
1817
1818
1819
1820
1821
1822
# File 'lib/nats/io/client.rb', line 1816

def initialize(options={})
  @uri = options[:uri]
  @connect_timeout = options[:connect_timeout]
  @write_timeout = options[:write_timeout]
  @read_timeout = options[:read_timeout]
  @socket = nil
end

Instance Attribute Details

#socketObject

Returns the value of attribute socket.



1814
1815
1816
# File 'lib/nats/io/client.rb', line 1814

def socket
  @socket
end

Instance Method Details

#closeObject



1905
1906
1907
# File 'lib/nats/io/client.rb', line 1905

def close
  @socket.close
end

#closed?Boolean

Returns:

  • (Boolean)


1909
1910
1911
# File 'lib/nats/io/client.rb', line 1909

def closed?
  @socket.closed?
end

#connectObject



1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
# File 'lib/nats/io/client.rb', line 1824

def connect
  addrinfo = ::Socket.getaddrinfo(@uri.hostname, nil, ::Socket::AF_UNSPEC, ::Socket::SOCK_STREAM)
  addrinfo.each_with_index do |ai, i|
    begin
      @socket = connect_addrinfo(ai, @uri.port, @connect_timeout)
      break
    rescue SystemCallError => e
      # Give up if no more available
      raise e if addrinfo.length == i+1
    end
  end

  # Set TCP no delay by default
  @socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
end

#read(max_bytes, deadline = nil) ⇒ Object



1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
# File 'lib/nats/io/client.rb', line 1848

def read(max_bytes, deadline=nil)

  begin
    return @socket.read_nonblock(max_bytes)
  rescue ::IO::WaitReadable
    if ::IO.select([@socket], nil, nil, deadline)
      retry
    else
      raise NATS::IO::SocketTimeoutError
    end
  rescue ::IO::WaitWritable
    if ::IO.select(nil, [@socket], nil, deadline)
      retry
    else
      raise NATS::IO::SocketTimeoutError
    end
  end
rescue EOFError => e
  if RUBY_ENGINE == 'jruby' and e.message == 'No message available'
    # FIXME: <EOFError: No message available> can happen in jruby
    # even though seems it is temporary and eventually possible
    # to read from socket.
    return nil
  end
  raise Errno::ECONNRESET
end

#read_line(deadline = nil) ⇒ Object



1840
1841
1842
1843
1844
1845
1846
# File 'lib/nats/io/client.rb', line 1840

def read_line(deadline=nil)
  # FIXME: Should accumulate and read in a non blocking way instead
  unless ::IO.select([@socket], nil, nil, deadline)
    raise NATS::IO::SocketTimeoutError
  end
  @socket.gets
end

#write(data, deadline = nil) ⇒ Object



1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
# File 'lib/nats/io/client.rb', line 1875

def write(data, deadline=nil)
  length = data.bytesize
  total_written = 0

  loop do
    begin
      written = @socket.write_nonblock(data)

      total_written += written
      break total_written if total_written >= length
      data = data.byteslice(written..-1)
    rescue ::IO::WaitWritable
      if ::IO.select(nil, [@socket], nil, deadline)
        retry
      else
        raise NATS::IO::SocketTimeoutError
      end
    rescue ::IO::WaitReadable
      if ::IO.select([@socket], nil, nil, deadline)
        retry
      else
        raise NATS::IO::SocketTimeoutError
      end
    end
  end

rescue EOFError
  raise Errno::ECONNRESET
end