Class: IProto::TCPSocket

Inherits:
Object
  • Object
show all
Includes:
ConnectionAPI
Defined in:
lib/iproto/tcp_socket.rb

Overview

TODO: timeouts

Defined Under Namespace

Classes: Retry

Constant Summary

Constants included from ConnectionAPI

ConnectionAPI::BINARY, ConnectionAPI::DEFAULT_RECONNECT, ConnectionAPI::HEADER_SIZE

Instance Method Summary collapse

Methods included from ConnectionAPI

#next_request_id, #pack_request

Constructor Details

#initialize(host, port, reconnect = true) ⇒ TCPSocket

Returns a new instance of TCPSocket.



6
7
8
9
10
11
12
13
# File 'lib/iproto/tcp_socket.rb', line 6

def initialize(host, port, reconnect = true)
  @addr = [host, port]
  @reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT
  @reconnect = !!reconnect
  @socket = nil
  @reconnect_time = Time.now - 1
  @retry = true
end

Instance Method Details

#_raise_disconnected(message, _raise = true) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/iproto/tcp_socket.rb', line 83

def _raise_disconnected(message, _raise = true)
  old_socket = @socket
  if @reconnect
    @socket = nil
    @reconnect_time = Time.now + @reconnect_timeout
  else
    @socket = :disconnected
  end
  case _raise
  when true
    raise Disconnected, message
  when :retry
    begin
      # if request were sent, then we will not have EPIPE exception
      old_socket.send '\x00', 0
    rescue Errno::EPIPE
      # OS knows that socket is closed, and request were not sent
      raise Retry
    else
      # OS didn't notice socket is closed, request were sent probably
      raise Disconnected, message
    end
  end
end

#closeObject



15
16
17
18
19
20
21
# File 'lib/iproto/tcp_socket.rb', line 15

def close
  @reconnect = false
  if @socket
    @socket.close rescue nil
    @socket = :disconnected
  end
end

#connected?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/iproto/tcp_socket.rb', line 23

def connected?
  @socket && @socket != :disconnected
end

#could_be_connected?Boolean

Returns:

  • (Boolean)


27
28
29
30
# File 'lib/iproto/tcp_socket.rb', line 27

def could_be_connected?
  @socket ? @socket != :disconnected
          : (@reconnect || @reconnect_time < Time.now)
end

#recv_header(request_id) ⇒ Object

end ConnectionAPI



69
70
71
72
73
74
75
76
77
# File 'lib/iproto/tcp_socket.rb', line 69

def recv_header(request_id)
  header = socket.read(HEADER_SIZE)  or _raise_disconnected('disconnected while read', @retry ? :retry : true)
  response_size = ::BinUtils.get_int32_le(header, 4)
  recv_request_id = ::BinUtils.get_int32_le(header, 8)
  unless request_id == recv_request_id
    raise UnexpectedResponse.new("Waiting response for request_id #{request_id}, but received for #{recv_request_id}")
  end
  response_size
end

#recv_response(response_size) ⇒ Object



79
80
81
# File 'lib/iproto/tcp_socket.rb', line 79

def recv_response(response_size)
  socket.read(response_size)  or _raise_disconnected('disconnected while read', true)
end

#send_request(request_type, body) ⇒ Object

begin ConnectionAPI



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/iproto/tcp_socket.rb', line 52

def send_request(request_type, body)
  unless could_be_connected?
    raise Disconnected, "connection is closed"
  end
  begin
    request_id = next_request_id
    socket.send pack_request(request_type, request_id, body), 0
    response_size = recv_header request_id
    recv_response response_size
  rescue Errno::EPIPE, Retry => e
    _raise_disconnected(e, !@retry)
    @retry = false
    retry
  end
end

#socketObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/iproto/tcp_socket.rb', line 32

def socket
  if (sock = @socket)
    sock != :disconnected ? sock : raise(Disconnected, "disconnected earlier")
  else
    sock = @socket = ::TCPSocket.new(*@addr)
    @retry = true
  end
  sock
rescue Errno::ECONNREFUSED => e
  unless @reconnect
    @socket = :disconnected
  else
    @reconnect_time = Time.now + @reconnect_timeout
  end
  raise CouldNotConnect, e
end