Class: Monga::Connections::EMConnection
- Inherits:
-
EM::Connection
- Object
- EM::Connection
- Monga::Connections::EMConnection
- Includes:
- EM::Deferrable
- Defined in:
- lib/monga/connections/em_connection.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#responses ⇒ Object
readonly
Returns the value of attribute responses.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #connected? ⇒ Boolean
- #connection_completed ⇒ Object
-
#initialize(host, port, timeout) ⇒ EMConnection
constructor
A new instance of EMConnection.
- #is_master? ⇒ Boolean
- #primary? ⇒ Boolean
- #receive_data(data) ⇒ Object
- #reconnect ⇒ Object
- #send_command(msg, request_id = nil, &cb) ⇒ Object
- #unbind ⇒ Object
Constructor Details
#initialize(host, port, timeout) ⇒ EMConnection
Returns a new instance of EMConnection.
9 10 11 12 13 14 15 |
# File 'lib/monga/connections/em_connection.rb', line 9 def initialize(host, port, timeout) @host = host @port = port @timeout = timeout @reactor_running = true @responses = {} end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
7 8 9 |
# File 'lib/monga/connections/em_connection.rb', line 7 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
7 8 9 |
# File 'lib/monga/connections/em_connection.rb', line 7 def port @port end |
#responses ⇒ Object (readonly)
Returns the value of attribute responses.
7 8 9 |
# File 'lib/monga/connections/em_connection.rb', line 7 def responses @responses end |
Class Method Details
.connect(host, port, timeout) ⇒ Object
17 18 19 |
# File 'lib/monga/connections/em_connection.rb', line 17 def self.connect(host, port, timeout) EM.connect(host, port, self, host, port, timeout) end |
Instance Method Details
#close ⇒ Object
96 97 98 99 |
# File 'lib/monga/connections/em_connection.rb', line 96 def close Monga.logger.debug("EventMachine is stopped, closing connection") @reactor_running = false end |
#connected? ⇒ Boolean
72 73 74 75 |
# File 'lib/monga/connections/em_connection.rb', line 72 def connected? reconnect unless @connected @connected || false end |
#connection_completed ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/monga/connections/em_connection.rb', line 42 def connection_completed Monga.logger.debug("Connection is established #{@host}:#{@port}") EM.add_shutdown_hook do close end unless @reactor_running EM.add_periodic_timer(Monga::Cursor::CLOSE_TIMEOUT){ Monga::Cursor.batch_kill(self) } end @connected = true @pending_for_reconnect = false @buffer = Buffer.new @reactor_running = true succeed end |
#is_master? ⇒ Boolean
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/monga/connections/em_connection.rb', line 105 def is_master? reconnect unless @connected req = Monga::Protocol::Query.new(self, "admin", "$cmd", query: {"isMaster" => 1}, limit: 1) command = req.command request_id = req.request_id blk = proc do |data| err, resp = req.parse_response(data) if Exception === err @primary = false yield nil else @primary = resp.last.first["ismaster"] yield @primary ? :primary : :secondary end end @responses[request_id] = blk send_data command end |
#primary? ⇒ Boolean
101 102 103 |
# File 'lib/monga/connections/em_connection.rb', line 101 def primary? @primary || false end |
#receive_data(data) ⇒ Object
33 34 35 36 37 38 39 40 |
# File 'lib/monga/connections/em_connection.rb', line 33 def receive_data(data) @buffer.append(data) @buffer.each do || request_id = [2] cb = @responses.delete request_id cb.call() if cb end end |
#reconnect ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/monga/connections/em_connection.rb', line 61 def reconnect unless @connected && @pending_for_reconnect if @reactor_running super(@host, @port) else EM.schedule{ super(@host, @port) } end @pending_for_reconnect = true end end |
#send_command(msg, request_id = nil, &cb) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/monga/connections/em_connection.rb', line 21 def send_command(msg, request_id=nil, &cb) # Reconnect is a hack for testing. # We are stopping EvenMachine for each test. # This hack reconnects to Mongo on first query reconnect unless @connected callback do send_data msg end @responses[request_id] = cb if cb end |
#unbind ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/monga/connections/em_connection.rb', line 77 def unbind @connected = false Monga.logger.debug("Lost connection #{@host}:#{@port}") @responses.keys.each do |k| cb = @responses.delete k err = Monga::Exceptions::Disconnected.new("Disconnected from #{@host}:#{@port}") cb.call(err) end @primary = false @pending_for_reconnect = false set_deferred_status(nil) if @reactor_running EM.add_timer(0.1){ reconnect } end end |