Class: Thrift::Socket

Inherits:
BaseTransport show all
Defined in:
lib/thrift/transport/socket.rb

Direct Known Subclasses

SSLSocket, UNIXSocket

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseTransport

#flush, #read_all, #read_byte, #read_into_buffer

Constructor Details

#initialize(host = 'localhost', port = 9090, timeout = nil) ⇒ Socket

Returns a new instance of Socket.



25
26
27
28
29
30
31
# File 'lib/thrift/transport/socket.rb', line 25

def initialize(host='localhost', port=9090, timeout=nil)
  @host = host
  @port = port
  @timeout = timeout
  @desc = "#{host}:#{port}"
  @handle = nil
end

Instance Attribute Details

#handleObject Also known as: to_io

Returns the value of attribute handle.



33
34
35
# File 'lib/thrift/transport/socket.rb', line 33

def handle
  @handle
end

#timeoutObject

Returns the value of attribute timeout.



33
34
35
# File 'lib/thrift/transport/socket.rb', line 33

def timeout
  @timeout
end

Instance Method Details

#closeObject



132
133
134
135
# File 'lib/thrift/transport/socket.rb', line 132

def close
  @handle.close unless @handle.nil? or @handle.closed?
  @handle = nil
end

#openObject

Raises:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/thrift/transport/socket.rb', line 35

def open
  for addrinfo in ::Socket::getaddrinfo(@host, @port, nil, ::Socket::SOCK_STREAM) do
    begin
      socket = ::Socket.new(addrinfo[4], ::Socket::SOCK_STREAM, 0)
      socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
      sockaddr = ::Socket.sockaddr_in(addrinfo[1], addrinfo[3])
      begin
        socket.connect_nonblock(sockaddr)
      rescue Errno::EINPROGRESS
        unless IO.select(nil, [ socket ], nil, @timeout)
          next
        end
        begin
          socket.connect_nonblock(sockaddr)
        rescue Errno::EISCONN
        end
      end
      return @handle = socket
    rescue StandardError => e
      next
    end
  end
  raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}: #{e}")
end

#open?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/thrift/transport/socket.rb', line 60

def open?
  !@handle.nil? and !@handle.closed?
end

#read(sz) ⇒ Object

Raises:

  • (IOError)


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/thrift/transport/socket.rb', line 96

def read(sz)
  raise IOError, "closed stream" unless open?

  begin
    if @timeout.nil? or @timeout == 0
      data = @handle.readpartial(sz)
    else
      # it's possible to interrupt select for something other than the timeout
      # so we need to ensure we've waited long enough, but not too long
      start = Time.now
      timespent = 0
      rd = loop do
        rd, = IO.select([@handle], nil, nil, @timeout - timespent)
        timespent = Time.now - start
        break rd if (rd and not rd.empty?) or timespent >= @timeout
      end
      if rd.nil? or rd.empty?
        raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
      else
        data = @handle.readpartial(sz)
      end
    end
  rescue TransportException => e
    # don't let this get caught by the StandardError handler
    raise e
  rescue StandardError => e
    @handle.close unless @handle.closed?
    @handle = nil
    raise TransportException.new(TransportException::NOT_OPEN, e.message)
  end
  if (data.nil? or data.length == 0)
    raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")
  end
  data
end

#to_sObject



139
140
141
# File 'lib/thrift/transport/socket.rb', line 139

def to_s
  "socket(#{@host}:#{@port})"
end

#write(str) ⇒ Object

Raises:

  • (IOError)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/thrift/transport/socket.rb', line 64

def write(str)
  raise IOError, "closed stream" unless open?
  str = Bytes.force_binary_encoding(str)
  begin
    if @timeout.nil? or @timeout == 0
      @handle.write(str)
    else
      len = 0
      start = Time.now
      while Time.now - start < @timeout
        rd, wr, = IO.select(nil, [@handle], nil, @timeout)
        if wr and not wr.empty?
          len += @handle.write_nonblock(str[len..-1])
          break if len >= str.length
        end
      end
      if len < str.length
        raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}")
      else
        len
      end
    end
  rescue TransportException => e
    # pass this on
    raise e
  rescue StandardError => e
    @handle.close
    @handle = nil
    raise TransportException.new(TransportException::NOT_OPEN, e.message)
  end
end