Module: EM::Voldemort::Connection::Handler
- Defined in:
- lib/em-voldemort/connection.rb
Overview
EventMachine handler for a Voldemort node connection
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
The EM::Voldemort::Connection object for which we’re handling the connection.
-
#in_flight ⇒ Object
readonly
If a request is currently in flight, this is a deferrable that will succeed or fail when the request completes.
-
#last_request ⇒ Object
readonly
The time at which the request currently in flight was sent.
-
#request_queue ⇒ Object
readonly
Array of [request_data, deferrable] pairs, containing requests that have not yet been sent.
-
#state ⇒ Object
readonly
State machine.
Instance Method Summary collapse
-
#close_gracefully ⇒ Object
Connection is asking us to shut down.
- #enqueue_request(request) ⇒ Object
- #initialize(connection) ⇒ Object
-
#post_init ⇒ Object
Connection established (called by EventMachine).
-
#receive_data(data) ⇒ Object
The Voldemort node is talking to us (called by EventMachine).
-
#send_next_request ⇒ Object
Takes the request at the front of the queue and sends it to the Voldemort node.
-
#send_protocol_proposal(protocol) ⇒ Object
First action when the connection is established: client tells the server which version of the Voldemort protocol it wants to use.
-
#unbind(reason = nil) ⇒ Object
Connection closed (called by EventMachine).
Instance Attribute Details
#connection ⇒ Object (readonly)
The EM::Voldemort::Connection object for which we’re handling the connection
97 98 99 |
# File 'lib/em-voldemort/connection.rb', line 97 def connection @connection end |
#in_flight ⇒ Object (readonly)
If a request is currently in flight, this is a deferrable that will succeed or fail when the request completes. The protocol requires that only one request can be in flight at once.
104 105 106 |
# File 'lib/em-voldemort/connection.rb', line 104 def in_flight @in_flight end |
#last_request ⇒ Object (readonly)
The time at which the request currently in flight was sent
107 108 109 |
# File 'lib/em-voldemort/connection.rb', line 107 def last_request @last_request end |
#request_queue ⇒ Object (readonly)
Array of [request_data, deferrable] pairs, containing requests that have not yet been sent
110 111 112 |
# File 'lib/em-voldemort/connection.rb', line 110 def request_queue @request_queue end |
#state ⇒ Object (readonly)
State machine. One of :connecting, :protocol_proposal, :idle, :request, :disconnected
100 101 102 |
# File 'lib/em-voldemort/connection.rb', line 100 def state @state end |
Instance Method Details
#close_gracefully ⇒ Object
Connection is asking us to shut down. Wait for the currently in-flight request to complete, but fail any unsent requests in the queue.
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/em-voldemort/connection.rb', line 190 def close_gracefully @request_queue.each {|request, deferrable| deferrable.fail(ServerError.new('shutdown requested')) } @request_queue = [] if in_flight in_flight.callback { close_connection } in_flight.errback { close_connection } else close_connection end end |
#enqueue_request(request) ⇒ Object
120 121 122 123 124 125 |
# File 'lib/em-voldemort/connection.rb', line 120 def enqueue_request(request) EM::DefaultDeferrable.new.tap do |deferrable| request_queue << [request, deferrable] send_next_request unless in_flight end end |
#initialize(connection) ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/em-voldemort/connection.rb', line 112 def initialize(connection) @connection = connection @state = :connecting @in_flight = EM::DefaultDeferrable.new @last_request = Time.now @request_queue = [] end |
#post_init ⇒ Object
Connection established (called by EventMachine)
148 149 150 151 152 153 154 |
# File 'lib/em-voldemort/connection.rb', line 148 def post_init connection.logger.info "Connected to Voldemort node at #{connection.host}:#{connection.port}" send_protocol_proposal(connection.protocol) in_flight.errback do |response| connection.logger.warn "Voldemort protocol #{connection.protocol} not accepted: #{response.inspect}" end end |
#receive_data(data) ⇒ Object
The Voldemort node is talking to us (called by EventMachine)
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/em-voldemort/connection.rb', line 157 def receive_data(data) case @state when :protocol_proposal deferrable = @in_flight @state = :idle @in_flight = nil if data == 'ok' send_next_request deferrable.succeed else close_connection deferrable.fail("server response: #{data.inspect}") end when :request @recv_buf << data response_size = @recv_buf.unpack('N').first if response_size && @recv_buf.bytesize >= response_size + 4 response = @recv_buf[4, response_size] deferrable = @in_flight @state = :idle @in_flight = @recv_buf = nil send_next_request deferrable.succeed(response) end else raise "Received data in unexpected state: #{@state.inspect}" end end |
#send_next_request ⇒ Object
Takes the request at the front of the queue and sends it to the Voldemort node
137 138 139 140 141 142 143 144 145 |
# File 'lib/em-voldemort/connection.rb', line 137 def send_next_request return if request_queue.empty? raise "cannot make a request while in #{@state.inspect} state" unless @state == :idle request, @in_flight = request_queue.shift send_data([request.size, request].pack('NA*')) @recv_buf = ''.force_encoding('BINARY') @last_request = Time.now @state = :request end |
#send_protocol_proposal(protocol) ⇒ Object
First action when the connection is established: client tells the server which version of the Voldemort protocol it wants to use
129 130 131 132 133 134 |
# File 'lib/em-voldemort/connection.rb', line 129 def send_protocol_proposal(protocol) raise ArgumentError, 'protocol must be 3 bytes long' if protocol.bytesize != 3 raise "unexpected state before protocol proposal: #{@state.inspect}" unless @state == :connecting send_data(protocol) @state = :protocol_proposal end |
#unbind(reason = nil) ⇒ Object
Connection closed (called by EventMachine)
202 203 204 205 206 207 208 209 210 |
# File 'lib/em-voldemort/connection.rb', line 202 def unbind(reason=nil) @state = :disconnected deferrable = @in_flight @in_flight = nil deferrable.fail(ServerError.new('connection closed')) if deferrable @request_queue.each {|request, deferrable| deferrable.fail(ServerError.new('connection closed')) } @request_queue = [] connection.connection_closed(self, reason) end |