Class: IProto::TCPSocket

Inherits:
Object
  • Object
show all
Includes:
ConnectionAPI
Defined in:
lib/iproto/tcp_socket.rb

Overview

TODO: timeouts

Defined Under Namespace

Classes: Retry

Constant Summary

Constants included from ConnectionAPI

ConnectionAPI::BINARY, ConnectionAPI::DEFAULT_RECONNECT, ConnectionAPI::HEADER_SIZE

Instance Method Summary collapse

Methods included from ConnectionAPI

#next_request_id, #pack_request

Constructor Details

#initialize(host, port, reconnect = true) ⇒ TCPSocket



6
7
8
9
10
11
12
13
# File 'lib/iproto/tcp_socket.rb', line 6

def initialize(host, port, reconnect = true)
  @addr = [host, port]
  @reconnect_timeout = Numeric === reconnect ? reconnect : DEFAULT_RECONNECT
  @reconnect = !!reconnect
  @socket = nil
  @reconnect_time = Time.now - 1
  @retry = true
end

Instance Method Details

#_raise_disconnected(message, _raise = true) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/iproto/tcp_socket.rb', line 76

def _raise_disconnected(message, _raise = true)
  if @reconnect
    @socket = nil
    @reconnect_time = Time.now + @reconnect_timeout
  else
    @socket = :disconnected
  end
  case _raise
  when true
    raise Disconnected, message
  when :retry
    raise Retry
  end
end

#closeObject



15
16
17
18
19
20
21
# File 'lib/iproto/tcp_socket.rb', line 15

def close
  @reconnect = false
  if @socket
    @socket.close rescue nil
    @socket = :disconnected
  end
end

#connected?Boolean



23
24
25
# File 'lib/iproto/tcp_socket.rb', line 23

def connected?
  @socket && @socket != :disconnected
end

#could_be_connected?Boolean



27
28
29
30
# File 'lib/iproto/tcp_socket.rb', line 27

def could_be_connected?
  @socket ? @socket != :disconnected
          : (@retry || @reconnect_time < Time.now)
end

#recv_header(request_id) ⇒ Object

end ConnectionAPI



62
63
64
65
66
67
68
69
70
# File 'lib/iproto/tcp_socket.rb', line 62

def recv_header(request_id)
  header = socket.read(HEADER_SIZE)  or _raise_disconnected('disconnected while read', @retry ? :retry : true)
  response_size = ::BinUtils.get_int32_le(header, 4)
  recv_request_id = ::BinUtils.get_int32_le(header, 8)
  unless request_id == recv_request_id
    raise UnexpectedResponse.new("Waiting response for request_id #{request_id}, but received for #{recv_request_id}")
  end
  response_size
end

#recv_response(response_size) ⇒ Object



72
73
74
# File 'lib/iproto/tcp_socket.rb', line 72

def recv_response(response_size)
  socket.read(response_size)  or _raise_disconnected('disconnected while read', 2)
end

#send_request(request_type, body) ⇒ Object

begin ConnectionAPI



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/iproto/tcp_socket.rb', line 48

def send_request(request_type, body)
  begin
    request_id = next_request_id
    socket.send pack_request(request_type, request_id, body), 0
    response_size = recv_header request_id
    recv_response response_size
  rescue Errno::EPIPE, Retry => e
    _raise_disconnected(e, !@retry)
    @retry = false
    retry
  end
end

#socketObject



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/iproto/tcp_socket.rb', line 32

def socket
  if (sock = @socket)
    sock != :disconnected ? sock : raise(Disconnected, "disconnected earlier")
  else
    sock = @socket = ::TCPSocket.new(*@addr)
    @retry = true
  end
  sock
rescue Errno::ECONNREFUSED => e
  @socket = :disconnected  unless @reconnect
  raise CouldNotConnect, e
end