Class: Monga::Connections::EMConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
EM::Deferrable
Defined in:
lib/monga/connections/em_connection.rb

Direct Known Subclasses

FiberedConnection

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, timeout) ⇒ EMConnection

Returns a new instance of EMConnection.



9
10
11
12
13
14
15
# File 'lib/monga/connections/em_connection.rb', line 9

def initialize(host, port, timeout)
  @host = host
  @port = port
  @timeout = timeout
  @reactor_running = true
  @responses = {}
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



7
8
9
# File 'lib/monga/connections/em_connection.rb', line 7

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



7
8
9
# File 'lib/monga/connections/em_connection.rb', line 7

def port
  @port
end

#responsesObject (readonly)

Returns the value of attribute responses.



7
8
9
# File 'lib/monga/connections/em_connection.rb', line 7

def responses
  @responses
end

Class Method Details

.connect(host, port, timeout) ⇒ Object



17
18
19
# File 'lib/monga/connections/em_connection.rb', line 17

def self.connect(host, port, timeout)
  EM.connect(host, port, self, host, port, timeout)
end

Instance Method Details

#closeObject



96
97
98
99
# File 'lib/monga/connections/em_connection.rb', line 96

def close
  Monga.logger.debug("EventMachine is stopped, closing connection")
  @reactor_running = false
end

#connected?Boolean

Returns:

  • (Boolean)


72
73
74
75
# File 'lib/monga/connections/em_connection.rb', line 72

def connected?
  reconnect unless @connected
  @connected || false
end

#connection_completedObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/monga/connections/em_connection.rb', line 42

def connection_completed
  Monga.logger.debug("Connection is established #{@host}:#{@port}")

  EM.add_shutdown_hook do
    close
  end

  unless @reactor_running
    EM.add_periodic_timer(Monga::Cursor::CLOSE_TIMEOUT){ Monga::Cursor.batch_kill(self) }
  end

  @connected = true
  @pending_for_reconnect = false
  @buffer = Buffer.new
  @reactor_running = true

  succeed
end

#is_master?Boolean

Returns:

  • (Boolean)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/monga/connections/em_connection.rb', line 105

def is_master?
  reconnect unless @connected
  req = Monga::Protocol::Query.new(self, "admin", "$cmd", query: {"isMaster" => 1}, limit: 1)
  command = req.command
  request_id = req.request_id
  blk = proc do |data|
    err, resp = req.parse_response(data)
    if Exception === err
      @primary = false
      yield nil
    else
      @primary = resp.last.first["ismaster"]
      yield @primary ? :primary : :secondary
    end
  end
  @responses[request_id] = blk
  send_data command
end

#primary?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/monga/connections/em_connection.rb', line 101

def primary?
  @primary || false
end

#receive_data(data) ⇒ Object



33
34
35
36
37
38
39
40
# File 'lib/monga/connections/em_connection.rb', line 33

def receive_data(data)
  @buffer.append(data)
  @buffer.each do |message|
    request_id = message[2]
    cb = @responses.delete request_id
    cb.call(message)  if cb
  end
end

#reconnectObject



61
62
63
64
65
66
67
68
69
70
# File 'lib/monga/connections/em_connection.rb', line 61

def reconnect
  unless @connected && @pending_for_reconnect
    if @reactor_running
      super(@host, @port)
    else
      EM.schedule{ super(@host, @port) }
    end
    @pending_for_reconnect = true
  end
end

#send_command(msg, request_id = nil, &cb) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/monga/connections/em_connection.rb', line 21

def send_command(msg, request_id=nil, &cb)
  # Reconnect is a hack for testing.
  # We are stopping EvenMachine for each test.
  # This hack reconnects to Mongo on first query
  reconnect unless @connected

  callback do
    send_data msg
  end
  @responses[request_id] = cb  if cb
end

#unbindObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/monga/connections/em_connection.rb', line 77

def unbind
  @connected = false
  Monga.logger.debug("Lost connection #{@host}:#{@port}")

  @responses.keys.each do |k|
    cb = @responses.delete k
    err = Monga::Exceptions::Disconnected.new("Disconnected from #{@host}:#{@port}")
    cb.call(err)
  end

  @primary = false
  @pending_for_reconnect = false
  set_deferred_status(nil)

  if @reactor_running
    EM.add_timer(0.1){ reconnect }
  end
end