Class: Dalli::Protocol::Binary::ResponseProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/dalli/protocol/binary/response_processor.rb

Overview

Class that encapsulates logic for processing binary protocol responses from memcached. Includes logic for pulling data from an IO source and parsing into local values. Handles errors on unexpected values.

Constant Summary collapse

RESPONSE_CODES =
{
  0 => 'No error',
  1 => 'Key not found',
  2 => 'Key exists',
  3 => 'Value too large',
  4 => 'Invalid arguments',
  5 => 'Item not stored',
  6 => 'Incr/decr on a non-numeric value',
  7 => 'The vbucket belongs to another server',
  8 => 'Authentication error',
  9 => 'Authentication continue',
  0x20 => 'Authentication required',
  0x81 => 'Unknown command',
  0x82 => 'Out of memory',
  0x83 => 'Not supported',
  0x84 => 'Internal error',
  0x85 => 'Busy',
  0x86 => 'Temporary failure'
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(io_source, value_marshaller) ⇒ ResponseProcessor

Returns a new instance of ResponseProcessor.



34
35
36
37
# File 'lib/dalli/protocol/binary/response_processor.rb', line 34

def initialize(io_source, value_marshaller)
  @io_source = io_source
  @value_marshaller = value_marshaller
end

Instance Method Details

#auth_response(buf = read_header) ⇒ Object



183
184
185
186
187
188
189
# File 'lib/dalli/protocol/binary/response_processor.rb', line 183

def auth_response(buf = read_header)
  resp_header = ResponseHeader.new(buf)
  body_len = resp_header.body_len
  validate_auth_format(resp_header.extra_len, body_len)
  content = read(body_len) if body_len.positive?
  [resp_header.status, content]
end

#consume_all_responses_until_noopObject



149
150
151
152
153
154
155
# File 'lib/dalli/protocol/binary/response_processor.rb', line 149

def consume_all_responses_until_noop
  loop do
    resp_header, = read_response
    # This is the response to the terminating noop / end of stat
    return true if resp_header.ok? && resp_header.key_len.zero?
  end
end

#contains_header?(buf) ⇒ Boolean

Returns:

  • (Boolean)


191
192
193
194
195
# File 'lib/dalli/protocol/binary/response_processor.rb', line 191

def contains_header?(buf)
  return false unless buf

  buf.bytesize >= ResponseHeader::SIZE
end

#data_cas_responseObject



102
103
104
105
106
107
108
109
110
111
# File 'lib/dalli/protocol/binary/response_processor.rb', line 102

def data_cas_response
  resp_header, body = read_response
  return [nil, resp_header.cas] if resp_header.not_found?
  return [nil, false] if resp_header.not_stored?

  raise_on_not_ok!(resp_header)
  return [nil, resp_header.cas] unless body

  [unpack_response_body(resp_header, body, true).last, resp_header.cas]
end

#decr_incrObject

Returns the new value for the key, if found and updated



114
115
116
117
# File 'lib/dalli/protocol/binary/response_processor.rb', line 114

def decr_incr
  body = generic_response
  body ? body.unpack1('Q>') : body
end

#deleteObject



94
95
96
97
98
99
100
# File 'lib/dalli/protocol/binary/response_processor.rb', line 94

def delete
  resp_header, = read_response
  return false if resp_header.not_found? || resp_header.not_stored?

  raise_on_not_ok!(resp_header)
  true
end

#flushObject



137
138
139
# File 'lib/dalli/protocol/binary/response_processor.rb', line 137

def flush
  no_body_response
end

#generic_responseObject



157
158
159
160
161
162
163
164
165
166
167
# File 'lib/dalli/protocol/binary/response_processor.rb', line 157

def generic_response
  resp_header, body = read_response

  return false if resp_header.not_stored? # Not stored, normal status for add operation
  return nil if resp_header.not_found?

  raise_on_not_ok!(resp_header)
  return true unless body

  unpack_response_body(resp_header, body, false).last
end

#get(cache_nils: false) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/dalli/protocol/binary/response_processor.rb', line 69

def get(cache_nils: false)
  resp_header, body = read_response

  return false if resp_header.not_stored? # Not stored, normal status for add operation
  return cache_nils ? ::Dalli::NOT_FOUND : nil if resp_header.not_found?

  raise_on_not_ok!(resp_header)
  return true unless body

  unpack_response_body(resp_header, body, true).last
end

#getk_response_from_buffer(buf) ⇒ Object

This method returns an array of values used in a pipelined getk process. The first value is the number of bytes by which to advance the pointer in the buffer. If the complete response is found in the buffer, this will be the response size. Otherwise it is zero.

The remaining three values in the array are the ResponseHeader, key, and value.



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/dalli/protocol/binary/response_processor.rb', line 211

def getk_response_from_buffer(buf)
  # There's no header in the buffer, so don't advance
  return [0, nil, nil, nil, nil] unless contains_header?(buf)

  resp_header = response_header_from_buffer(buf)
  body_len = resp_header.body_len

  # We have a complete response that has no body.
  # This is either the response to the terminating
  # noop or, if the status is not zero, an intermediate
  # error response that needs to be discarded.
  return [ResponseHeader::SIZE, resp_header.ok?, resp_header.cas, nil, nil] if body_len.zero?

  resp_size = ResponseHeader::SIZE + body_len
  # The header is in the buffer, but the body is not.  As we don't have
  # a complete response, don't advance the buffer
  return [0, nil, nil, nil, nil] unless buf.bytesize >= resp_size

  # The full response is in our buffer, so parse it and return
  # the values
  body = buf.byteslice(ResponseHeader::SIZE, body_len)
  key, value = unpack_response_body(resp_header, body, true)
  [resp_size, resp_header.ok?, resp_header.cas, key, value]
end

#no_body_responseObject



169
170
171
172
173
174
175
# File 'lib/dalli/protocol/binary/response_processor.rb', line 169

def no_body_response
  resp_header, = read_response
  return false if resp_header.not_stored? # Not stored, possible status for append/prepend/delete

  raise_on_not_ok!(resp_header)
  true
end

#raise_on_not_ok!(resp_header) ⇒ Object

Raises:



63
64
65
66
67
# File 'lib/dalli/protocol/binary/response_processor.rb', line 63

def raise_on_not_ok!(resp_header)
  return if resp_header.ok?

  raise Dalli::DalliError, "Response error #{resp_header.status}: #{RESPONSE_CODES[resp_header.status]}"
end

#read(num_bytes) ⇒ Object



39
40
41
# File 'lib/dalli/protocol/binary/response_processor.rb', line 39

def read(num_bytes)
  @io_source.read(num_bytes)
end

#read_headerObject



59
60
61
# File 'lib/dalli/protocol/binary/response_processor.rb', line 59

def read_header
  read(ResponseHeader::SIZE) || raise(Dalli::NetworkError, 'No response')
end

#read_responseObject



43
44
45
46
47
# File 'lib/dalli/protocol/binary/response_processor.rb', line 43

def read_response
  resp_header = ResponseHeader.new(read_header)
  body = read(resp_header.body_len) if resp_header.body_len.positive?
  [resp_header, body]
end

#resetObject



141
142
143
# File 'lib/dalli/protocol/binary/response_processor.rb', line 141

def reset
  generic_response
end

#response_header_from_buffer(buf) ⇒ Object



197
198
199
# File 'lib/dalli/protocol/binary/response_processor.rb', line 197

def response_header_from_buffer(buf)
  ResponseHeader.new(buf)
end

#statsObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/dalli/protocol/binary/response_processor.rb', line 119

def stats
  hash = {}
  loop do
    resp_header, body = read_response
    # This is the response to the terminating noop / end of stat
    return hash if resp_header.ok? && resp_header.key_len.zero?

    # Ignore any responses with non-zero status codes,
    # such as errors from set operations.  That allows
    # this code to be used at the end of a multi
    # block to clear any error responses from inside the multi.
    next unless resp_header.ok?

    key, value = unpack_response_body(resp_header, body, true)
    hash[key] = value
  end
end

#storage_responseObject

Response for a storage operation. Returns the cas on success. False if the value wasn’t stored. And raises an error on all other error codes from memcached.



86
87
88
89
90
91
92
# File 'lib/dalli/protocol/binary/response_processor.rb', line 86

def storage_response
  resp_header, = read_response
  return false if resp_header.not_stored? # Not stored, normal status for add operation

  raise_on_not_ok!(resp_header)
  resp_header.cas
end

#unpack_response_body(resp_header, body, parse_as_stored_value) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/dalli/protocol/binary/response_processor.rb', line 49

def unpack_response_body(resp_header, body, parse_as_stored_value)
  extra_len = resp_header.extra_len
  key_len = resp_header.key_len
  bitflags = extra_len.positive? ? body.unpack1('N') : 0x0
  key = body.byteslice(extra_len, key_len).force_encoding('UTF-8') if key_len.positive?
  value = body.byteslice((extra_len + key_len)..-1)
  value = parse_as_stored_value ? @value_marshaller.retrieve(value, bitflags) : value
  [key, value]
end

#validate_auth_format(extra_len, count) ⇒ Object



177
178
179
180
181
# File 'lib/dalli/protocol/binary/response_processor.rb', line 177

def validate_auth_format(extra_len, count)
  return if extra_len.zero?

  raise Dalli::NetworkError, "Unexpected message format: #{extra_len} #{count}"
end

#versionObject



145
146
147
# File 'lib/dalli/protocol/binary/response_processor.rb', line 145

def version
  generic_response
end