Module: EM::Voldemort::Connection::Handler

Defined in:
lib/em-voldemort/connection.rb

Overview

EventMachine handler for a Voldemort node connection

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connectionObject (readonly)

The EM::Voldemort::Connection object for which we’re handling the connection



97
98
99
# File 'lib/em-voldemort/connection.rb', line 97

def connection
  @connection
end

#in_flightObject (readonly)

If a request is currently in flight, this is a deferrable that will succeed or fail when the request completes. The protocol requires that only one request can be in flight at once.



104
105
106
# File 'lib/em-voldemort/connection.rb', line 104

def in_flight
  @in_flight
end

#last_requestObject (readonly)

The time at which the request currently in flight was sent



107
108
109
# File 'lib/em-voldemort/connection.rb', line 107

def last_request
  @last_request
end

#request_queueObject (readonly)

Array of [request_data, deferrable] pairs, containing requests that have not yet been sent



110
111
112
# File 'lib/em-voldemort/connection.rb', line 110

def request_queue
  @request_queue
end

#stateObject (readonly)

State machine. One of :connecting, :protocol_proposal, :idle, :request, :disconnected



100
101
102
# File 'lib/em-voldemort/connection.rb', line 100

def state
  @state
end

Instance Method Details

#close_gracefullyObject

Connection is asking us to shut down. Wait for the currently in-flight request to complete, but fail any unsent requests in the queue.



190
191
192
193
194
195
196
197
198
199
# File 'lib/em-voldemort/connection.rb', line 190

def close_gracefully
  @request_queue.each {|request, deferrable| deferrable.fail(ServerError.new('shutdown requested')) }
  @request_queue = []
  if in_flight
    in_flight.callback { close_connection }
    in_flight.errback  { close_connection }
  else
    close_connection
  end
end

#enqueue_request(request) ⇒ Object



120
121
122
123
124
125
# File 'lib/em-voldemort/connection.rb', line 120

def enqueue_request(request)
  EM::DefaultDeferrable.new.tap do |deferrable|
    request_queue << [request, deferrable]
    send_next_request unless in_flight
  end
end

#initialize(connection) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/em-voldemort/connection.rb', line 112

def initialize(connection)
  @connection = connection
  @state = :connecting
  @in_flight = EM::DefaultDeferrable.new
  @last_request = Time.now
  @request_queue = []
end

#post_initObject

Connection established (called by EventMachine)



148
149
150
151
152
153
154
# File 'lib/em-voldemort/connection.rb', line 148

def post_init
  connection.logger.info "Connected to Voldemort node at #{connection.host}:#{connection.port}"
  send_protocol_proposal(connection.protocol)
  in_flight.errback do |response|
    connection.logger.warn "Voldemort protocol #{connection.protocol} not accepted: #{response.inspect}"
  end
end

#receive_data(data) ⇒ Object

The Voldemort node is talking to us (called by EventMachine)



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/em-voldemort/connection.rb', line 157

def receive_data(data)
  case @state
  when :protocol_proposal
    deferrable = @in_flight
    @state = :idle
    @in_flight = nil
    if data == 'ok'
      send_next_request
      deferrable.succeed
    else
      close_connection
      deferrable.fail("server response: #{data.inspect}")
    end

  when :request
    @recv_buf << data
    response_size = @recv_buf.unpack('N').first
    if response_size && @recv_buf.bytesize >= response_size + 4
      response = @recv_buf[4, response_size]
      deferrable = @in_flight
      @state = :idle
      @in_flight = @recv_buf = nil
      send_next_request
      deferrable.succeed(response)
    end

  else
    raise "Received data in unexpected state: #{@state.inspect}"
  end
end

#send_next_requestObject

Takes the request at the front of the queue and sends it to the Voldemort node



137
138
139
140
141
142
143
144
145
# File 'lib/em-voldemort/connection.rb', line 137

def send_next_request
  return if request_queue.empty?
  raise "cannot make a request while in #{@state.inspect} state" unless @state == :idle
  request, @in_flight = request_queue.shift
  send_data([request.size, request].pack('NA*'))
  @recv_buf = ''.force_encoding('BINARY')
  @last_request = Time.now
  @state = :request
end

#send_protocol_proposal(protocol) ⇒ Object

First action when the connection is established: client tells the server which version of the Voldemort protocol it wants to use

Raises:

  • (ArgumentError)


129
130
131
132
133
134
# File 'lib/em-voldemort/connection.rb', line 129

def send_protocol_proposal(protocol)
  raise ArgumentError, 'protocol must be 3 bytes long' if protocol.bytesize != 3
  raise "unexpected state before protocol proposal: #{@state.inspect}" unless @state == :connecting
  send_data(protocol)
  @state = :protocol_proposal
end

#unbind(reason = nil) ⇒ Object

Connection closed (called by EventMachine)



202
203
204
205
206
207
208
209
210
# File 'lib/em-voldemort/connection.rb', line 202

def unbind(reason=nil)
  @state = :disconnected
  deferrable = @in_flight
  @in_flight = nil
  deferrable.fail(ServerError.new('connection closed')) if deferrable
  @request_queue.each {|request, deferrable| deferrable.fail(ServerError.new('connection closed')) }
  @request_queue = []
  connection.connection_closed(self, reason)
end