Class: Monga::Connections::KGIOConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/monga/connections/kgio_connection.rb

Constant Summary collapse

TO_RECV =
512

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, timeout) ⇒ KGIOConnection

Returns a new instance of KGIOConnection.



12
13
14
15
16
17
# File 'lib/monga/connections/kgio_connection.rb', line 12

def initialize(host, port, timeout)
  @host, @port, @timout = host, port, timeout
  @connected = true
  @buffer = Buffer.new
  @tmp = ""
end

Class Method Details

.connect(host, port, timeout) ⇒ Object



8
9
10
# File 'lib/monga/connections/kgio_connection.rb', line 8

def self.connect(host, port, timeout)
  new(host, port, timeout)
end

Instance Method Details

#closeObject



104
105
106
107
108
# File 'lib/monga/connections/kgio_connection.rb', line 104

def close
  @socket = nil
  @primary = false
  @connected = false
end

#connected?Boolean

Returns:

  • (Boolean)


19
20
21
22
# File 'lib/monga/connections/kgio_connection.rb', line 19

def connected?
  socket unless @connected
  @connected
end

#is_master?Boolean

Returns:

  • (Boolean)


90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/monga/connections/kgio_connection.rb', line 90

def is_master?
  req = Monga::Protocol::Query.new(self, "admin", "$cmd", query: {"isMaster" => 1}, limit: 1)
  command = req.command
  request_id = req.request_id
  socket.kgio_write command
  read_socket
  message = @buffer.first
  @primary = message.last.first["ismaster"]
  yield @primary ? :primary : :secondary
rescue => e
  close
  yield nil
end

#primary?Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/monga/connections/kgio_connection.rb', line 86

def primary?
  @primary || false
end

#read_socketObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/monga/connections/kgio_connection.rb', line 65

def read_socket
  while @buffer.buffer_size < 4
    unless socket.kgio_read(TO_RECV, @tmp)
      raise Errno::ECONNREFUSED.new "Nil was return. Closing connection"
    end

    @buffer.append(@tmp)

    size = @buffer.buffer_size
    if size >= 4
      length = ::BinUtils.get_int32_le(@buffer.buffer)  

      torecv = length - size
      if torecv > 0
        socket.read(torecv, @tmp)
        @buffer.append(@tmp)
      end
    end
  end
end

#responsesObject

Fake answer, as far as we are blocking, but we should support API



40
41
42
# File 'lib/monga/connections/kgio_connection.rb', line 40

def responses
  0
end

#send_command(msg, request_id = nil, &cb) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/monga/connections/kgio_connection.rb', line 44

def send_command(msg, request_id=nil, &cb)
  raise Errno::ECONNREFUSED, "Connection Refused" unless socket
  socket.kgio_write msg
  if cb
    read_socket

    message = @buffer.first
    rid = message[2]

    fail "Returned Request Id is not equal to sended one (#{rid} != #{request_id}), #{message}" if rid != request_id

    cb.call(message)
  end
rescue Errno::ECONNREFUSED, Errno::EPIPE => e
  close
  if cb
    err = Monga::Exceptions::Disconnected.new("Disconnected from #{@host}:#{@port}, #{e.message}")
    cb.call(err)
  end
end

#socketObject



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/monga/connections/kgio_connection.rb', line 24

def socket
  @socket ||= begin
    sock = Kgio::TCPSocket.new(@host, @port)
    # MacOS doesn't support autopush
    sock.kgio_autopush = true unless RUBY_PLATFORM['darwin']
    # check connection
    sock.kgio_write ""
    @connected = true
    sock
  end
rescue => e
  nil
end