Class: Dalli::Protocol::Binary::ResponseProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/dalli/protocol/binary/response_processor.rb

Overview

Class that encapsulates logic for processing binary protocol responses from memcached. Includes logic for pulling data from an IO source and parsing into local values. Handles errors on unexpected values.

Constant Summary collapse

RESPONSE_CODES =
{
  0 => 'No error',
  1 => 'Key not found',
  2 => 'Key exists',
  3 => 'Value too large',
  4 => 'Invalid arguments',
  5 => 'Item not stored',
  6 => 'Incr/decr on a non-numeric value',
  7 => 'The vbucket belongs to another server',
  8 => 'Authentication error',
  9 => 'Authentication continue',
  0x20 => 'Authentication required',
  0x81 => 'Unknown command',
  0x82 => 'Out of memory',
  0x83 => 'Not supported',
  0x84 => 'Internal error',
  0x85 => 'Busy',
  0x86 => 'Temporary failure'
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(io_source, value_marshaller) ⇒ ResponseProcessor

Returns a new instance of ResponseProcessor.



34
35
36
37
# File 'lib/dalli/protocol/binary/response_processor.rb', line 34

def initialize(io_source, value_marshaller)
  @io_source = io_source
  @value_marshaller = value_marshaller
end

Instance Method Details

#auth_response(buf = read_header) ⇒ Object



144
145
146
147
148
149
150
# File 'lib/dalli/protocol/binary/response_processor.rb', line 144

def auth_response(buf = read_header)
  resp_header = ResponseHeader.new(buf)
  body_len = resp_header.body_len
  validate_auth_format(resp_header.extra_len, body_len)
  content = read(body_len) if body_len.positive?
  [resp_header.status, content]
end

#cas_responseObject



111
112
113
# File 'lib/dalli/protocol/binary/response_processor.rb', line 111

def cas_response
  data_cas_response.last
end

#contains_header?(buf) ⇒ Boolean

Returns:

  • (Boolean)


152
153
154
155
156
# File 'lib/dalli/protocol/binary/response_processor.rb', line 152

def contains_header?(buf)
  return false unless buf

  buf.bytesize >= ResponseHeader::SIZE
end

#data_cas_response(unpack: true) ⇒ Object



100
101
102
103
104
105
106
107
108
109
# File 'lib/dalli/protocol/binary/response_processor.rb', line 100

def data_cas_response(unpack: true)
  resp_header, body = read_response
  return [nil, resp_header.cas] if resp_header.not_found?
  return [nil, false] if resp_header.not_stored?

  raise_on_not_ok!(resp_header)
  return [nil, resp_header.cas] unless body

  [unpack_response_body(resp_header.extra_len, resp_header.key_len, body, unpack).last, resp_header.cas]
end

#decr_incr_responseObject



133
134
135
136
# File 'lib/dalli/protocol/binary/response_processor.rb', line 133

def decr_incr_response
  body = generic_response
  body ? body.unpack1('Q>') : body
end

#generic_response(unpack: false, cache_nils: false) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/dalli/protocol/binary/response_processor.rb', line 67

def generic_response(unpack: false, cache_nils: false)
  resp_header, body = read_response

  return false if resp_header.not_stored? # Not stored, normal status for add operation
  return cache_nils ? ::Dalli::NOT_FOUND : nil if resp_header.not_found?

  raise_on_not_ok!(resp_header)
  return true unless body

  unpack_response_body(resp_header.extra_len, resp_header.key_len, body, unpack).last
end

#getk_response_from_buffer(buf) ⇒ Object

This method returns an array of values used in a pipelined getk process. The first value is the number of bytes by which to advance the pointer in the buffer. If the complete response is found in the buffer, this will be the response size. Otherwise it is zero.

The remaining three values in the array are the ResponseHeader, key, and value.



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/dalli/protocol/binary/response_processor.rb', line 173

def getk_response_from_buffer(buf)
  # There's no header in the buffer, so don't advance
  return [0, nil, nil, nil] unless contains_header?(buf)

  resp_header = response_header_from_buffer(buf)
  body_len = resp_header.body_len

  # We have a complete response that has no body.
  # This is either the response to the terminating
  # noop or, if the status is not zero, an intermediate
  # error response that needs to be discarded.
  return [ResponseHeader::SIZE, resp_header, nil, nil] if body_len.zero?

  resp_size = ResponseHeader::SIZE + body_len
  # The header is in the buffer, but the body is not.  As we don't have
  # a complete response, don't advance the buffer
  return [0, nil, nil, nil] unless buf.bytesize >= resp_size

  # The full response is in our buffer, so parse it and return
  # the values
  body = buf.slice(ResponseHeader::SIZE, body_len)
  key, value = unpack_response_body(resp_header.extra_len, resp_header.key_len, body, true)
  [resp_size, resp_header, key, value]
end

#multi_with_keys_responseObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/dalli/protocol/binary/response_processor.rb', line 115

def multi_with_keys_response
  hash = {}
  loop do
    resp_header, body = read_response
    # This is the response to the terminating noop / end of stat
    return hash if resp_header.ok? && resp_header.key_len.zero?

    # Ignore any responses with non-zero status codes,
    # such as errors from set operations.  That allows
    # this code to be used at the end of a multi
    # block to clear any error responses from inside the multi.
    next unless resp_header.ok?

    key, value = unpack_response_body(resp_header.extra_len, resp_header.key_len, body, true)
    hash[key] = value
  end
end

#no_body_responseObject



92
93
94
95
96
97
98
# File 'lib/dalli/protocol/binary/response_processor.rb', line 92

def no_body_response
  resp_header, = read_response
  return false if resp_header.not_stored? # Not stored, possible status for append/prepend

  raise_on_not_ok!(resp_header)
  true
end

#raise_on_not_ok!(resp_header) ⇒ Object

Raises:



61
62
63
64
65
# File 'lib/dalli/protocol/binary/response_processor.rb', line 61

def raise_on_not_ok!(resp_header)
  return if resp_header.ok?

  raise Dalli::DalliError, "Response error #{resp_header.status}: #{RESPONSE_CODES[resp_header.status]}"
end

#read(num_bytes) ⇒ Object



39
40
41
# File 'lib/dalli/protocol/binary/response_processor.rb', line 39

def read(num_bytes)
  @io_source.read(num_bytes)
end

#read_headerObject



57
58
59
# File 'lib/dalli/protocol/binary/response_processor.rb', line 57

def read_header
  read(ResponseHeader::SIZE) || raise(Dalli::NetworkError, 'No response')
end

#read_responseObject



43
44
45
46
47
# File 'lib/dalli/protocol/binary/response_processor.rb', line 43

def read_response
  resp_header = ResponseHeader.new(read_header)
  body = read(resp_header.body_len) if resp_header.body_len.positive?
  [resp_header, body]
end

#response_header_from_buffer(buf) ⇒ Object



158
159
160
161
# File 'lib/dalli/protocol/binary/response_processor.rb', line 158

def response_header_from_buffer(buf)
  header = buf.slice(0, ResponseHeader::SIZE)
  ResponseHeader.new(header)
end

#storage_responseObject

Response for a storage operation. Returns the cas on success. False if the value wasn’t stored. And raises an error on all other error codes from memcached.



84
85
86
87
88
89
90
# File 'lib/dalli/protocol/binary/response_processor.rb', line 84

def storage_response
  resp_header, = read_response
  return false if resp_header.not_stored? # Not stored, normal status for add operation

  raise_on_not_ok!(resp_header)
  resp_header.cas
end

#unpack_response_body(extra_len, key_len, body, unpack) ⇒ Object



49
50
51
52
53
54
55
# File 'lib/dalli/protocol/binary/response_processor.rb', line 49

def unpack_response_body(extra_len, key_len, body, unpack)
  bitflags = extra_len.positive? ? body.byteslice(0, extra_len).unpack1('N') : 0x0
  key = body.byteslice(extra_len, key_len) if key_len.positive?
  value = body.byteslice(extra_len + key_len, body.bytesize - (extra_len + key_len))
  value = unpack ? @value_marshaller.retrieve(value, bitflags) : value
  [key, value]
end

#validate_auth_format(extra_len, count) ⇒ Object



138
139
140
141
142
# File 'lib/dalli/protocol/binary/response_processor.rb', line 138

def validate_auth_format(extra_len, count)
  return if extra_len.zero?

  raise Dalli::NetworkError, "Unexpected message format: #{extra_len} #{count}"
end