Class: Dalli::Protocol::Meta::ResponseProcessor
- Inherits:
-
Object
- Object
- Dalli::Protocol::Meta::ResponseProcessor
- Defined in:
- lib/dalli/protocol/response_processor.rb
Overview
Class that encapsulates logic for processing meta protocol responses from memcached. Includes logic for pulling data from an IO source and parsing into local values. Handles errors on unexpected values.
Constant Summary collapse
- EN =
'EN'- END_TOKEN =
'END'- EX =
'EX'- HD =
'HD'- MN =
'MN'- NF =
'NF'- NS =
'NS'- OK =
'OK'- RESET =
'RESET'- STAT =
'STAT'- VA =
'VA'- VERSION =
'VERSION'- SERVER_ERROR =
'SERVER_ERROR'
Instance Method Summary collapse
- #bitflags_from_tokens(tokens) ⇒ Object
- #body_len_from_tokens(tokens) ⇒ Object
- #build_metadata_result(tokens) ⇒ Object
- #cas_from_tokens(tokens) ⇒ Object
- #consume_all_responses_until_mn ⇒ Object
- #decr_incr ⇒ Object
- #error_on_unexpected!(expected_codes) ⇒ Object
- #flush ⇒ Object
- #full_response_from_buffer(tokens, body, resp_size) ⇒ Object
-
#getk_response_from_buffer(buf, offset = 0) ⇒ Object
This method returns an array of values used in a pipelined getk process.
-
#hit_status_from_tokens(tokens) ⇒ Object
Returns true if item was previously hit, false if first access, nil if not requested The h flag returns h0 (first access) or h1 (previously accessed).
-
#initialize(io_source, value_marshaller) ⇒ ResponseProcessor
constructor
A new instance of ResponseProcessor.
- #key_from_tokens(tokens) ⇒ Object
-
#last_access_from_tokens(tokens) ⇒ Object
Returns seconds since last access, or nil if not requested The l flag returns l<seconds>.
- #meta_delete ⇒ Object
-
#meta_get_with_metadata(cache_nils: false, return_hit_status: false, return_last_access: false) ⇒ Object
Returns a hash with all requested metadata: - :value - the cached value (or nil if miss) - :cas - the CAS value (if return_cas was requested) - :won_recache - true if client won the right to recache (W flag) - :stale - true if the item is stale (X flag) - :lost_recache - true if another client is already recaching (Z flag) - :hit_before - true/false if item was previously accessed (h flag, if requested) - :last_access - seconds since last access (l flag, if requested).
- #meta_get_with_value(cache_nils: false) ⇒ Object
- #meta_get_with_value_and_cas ⇒ Object
- #meta_get_without_value ⇒ Object
- #meta_set_append_prepend ⇒ Object
- #meta_set_with_cas ⇒ Object
- #next_line_to_tokens ⇒ Object
- #parse_value_from_tokens(tokens, cache_nils) ⇒ Object
- #read_data(data_size) ⇒ Object
- #read_line ⇒ Object
- #reset ⇒ Object
- #stats ⇒ Object
- #value_from_tokens(tokens, flag) ⇒ Object
- #version ⇒ Object
Constructor Details
#initialize(io_source, value_marshaller) ⇒ ResponseProcessor
Returns a new instance of ResponseProcessor.
26 27 28 29 |
# File 'lib/dalli/protocol/response_processor.rb', line 26 def initialize(io_source, value_marshaller) @io_source = io_source @value_marshaller = value_marshaller end |
Instance Method Details
#bitflags_from_tokens(tokens) ⇒ Object
202 203 204 |
# File 'lib/dalli/protocol/response_processor.rb', line 202 def bitflags_from_tokens(tokens) value_from_tokens(tokens, 'f')&.to_i end |
#body_len_from_tokens(tokens) ⇒ Object
231 232 233 |
# File 'lib/dalli/protocol/response_processor.rb', line 231 def body_len_from_tokens(tokens) value_from_tokens(tokens, 's')&.to_i end |
#build_metadata_result(tokens) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/dalli/protocol/response_processor.rb', line 74 def (tokens) { value: nil, cas: cas_from_tokens(tokens), won_recache: tokens.include?('W'), stale: tokens.include?('X'), lost_recache: tokens.include?('Z') } end |
#cas_from_tokens(tokens) ⇒ Object
206 207 208 |
# File 'lib/dalli/protocol/response_processor.rb', line 206 def cas_from_tokens(tokens) value_from_tokens(tokens, 'c')&.to_i end |
#consume_all_responses_until_mn ⇒ Object
143 144 145 146 147 148 |
# File 'lib/dalli/protocol/response_processor.rb', line 143 def consume_all_responses_until_mn tokens = next_line_to_tokens tokens = next_line_to_tokens while tokens.first != MN true end |
#decr_incr ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/dalli/protocol/response_processor.rb', line 108 def decr_incr tokens = error_on_unexpected!([VA, NF, NS, EX]) return false if [NS, EX].include?(tokens.first) return nil if tokens.first == NF read_line.to_i end |
#error_on_unexpected!(expected_codes) ⇒ Object
192 193 194 195 196 197 198 199 200 |
# File 'lib/dalli/protocol/response_processor.rb', line 192 def error_on_unexpected!(expected_codes) tokens = next_line_to_tokens return tokens if expected_codes.include?(tokens.first) raise Dalli::ServerError, tokens.join(' ').to_s if tokens.first == SERVER_ERROR raise Dalli::DalliError, "Response error: #{tokens.first}" end |
#flush ⇒ Object
126 127 128 129 130 |
# File 'lib/dalli/protocol/response_processor.rb', line 126 def flush error_on_unexpected!([OK]) true end |
#full_response_from_buffer(tokens, body, resp_size) ⇒ Object
150 151 152 153 |
# File 'lib/dalli/protocol/response_processor.rb', line 150 def full_response_from_buffer(tokens, body, resp_size) value = @value_marshaller.retrieve(body, bitflags_from_tokens(tokens)) [resp_size, tokens.first == VA, cas_from_tokens(tokens), key_from_tokens(tokens), value] end |
#getk_response_from_buffer(buf, offset = 0) ⇒ Object
This method returns an array of values used in a pipelined getk process. The first value is the number of bytes by which to advance the pointer in the buffer. If the complete response is found in the buffer, this will be the response size. Otherwise it is zero.
The remaining three values in the array are the ResponseHeader, key, and value.
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/dalli/protocol/response_processor.rb', line 165 def getk_response_from_buffer(buf, offset = 0) # Find the header terminator starting from offset term_idx = buf.index(TERMINATOR, offset) return [0, nil, nil, nil, nil] unless term_idx header = buf.byteslice(offset, term_idx - offset) tokens = header.split header_len = header.bytesize + TERMINATOR.length body_len = body_len_from_tokens(tokens) # We have a complete response that has no body. # This is either the response to the terminating # noop or, if the status is not MN, an intermediate # error response that needs to be discarded. return [header_len, true, nil, nil, nil] if body_len.zero? resp_size = header_len + body_len + TERMINATOR.length # The header is in the buffer, but the body is not. As we don't have # a complete response, don't advance the buffer return [0, nil, nil, nil, nil] unless buf.bytesize >= offset + resp_size # The full response is in our buffer, so parse it and return # the values body = buf.byteslice(offset + header_len, body_len) full_response_from_buffer(tokens, body, resp_size) end |
#hit_status_from_tokens(tokens) ⇒ Object
Returns true if item was previously hit, false if first access, nil if not requested The h flag returns h0 (first access) or h1 (previously accessed)
218 219 220 221 222 223 |
# File 'lib/dalli/protocol/response_processor.rb', line 218 def hit_status_from_tokens(tokens) hit_token = tokens.find { |t| t.start_with?('h') && t.length == 2 } return nil unless hit_token hit_token[1] == '1' end |
#key_from_tokens(tokens) ⇒ Object
210 211 212 213 214 |
# File 'lib/dalli/protocol/response_processor.rb', line 210 def key_from_tokens(tokens) encoded_key = value_from_tokens(tokens, 'k') base64_encoded = tokens.any?('b') KeyRegularizer.decode(encoded_key, base64_encoded) end |
#last_access_from_tokens(tokens) ⇒ Object
Returns seconds since last access, or nil if not requested The l flag returns l<seconds>
227 228 229 |
# File 'lib/dalli/protocol/response_processor.rb', line 227 def last_access_from_tokens(tokens) value_from_tokens(tokens, 'l')&.to_i end |
#meta_delete ⇒ Object
103 104 105 106 |
# File 'lib/dalli/protocol/response_processor.rb', line 103 def tokens = error_on_unexpected!([HD, NF, EX]) tokens.first == HD end |
#meta_get_with_metadata(cache_nils: false, return_hit_status: false, return_last_access: false) ⇒ Object
Returns a hash with all requested metadata:
-
:value - the cached value (or nil if miss)
-
:cas - the CAS value (if return_cas was requested)
-
:won_recache - true if client won the right to recache (W flag)
-
:stale - true if the item is stale (X flag)
-
:lost_recache - true if another client is already recaching (Z flag)
-
:hit_before - true/false if item was previously accessed (h flag, if requested)
-
:last_access - seconds since last access (l flag, if requested)
Used by meta_get for comprehensive metadata retrieval. Supports thundering herd protection (N/R flags) and metadata flags (h/l/u).
65 66 67 68 69 70 71 72 |
# File 'lib/dalli/protocol/response_processor.rb', line 65 def (cache_nils: false, return_hit_status: false, return_last_access: false) tokens = error_on_unexpected!([VA, EN, HD]) result = (tokens) result[:hit_before] = hit_status_from_tokens(tokens) if return_hit_status result[:last_access] = last_access_from_tokens(tokens) if return_last_access result[:value] = parse_value_from_tokens(tokens, cache_nils) result end |
#meta_get_with_value(cache_nils: false) ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/dalli/protocol/response_processor.rb', line 31 def (cache_nils: false) tokens = error_on_unexpected!([VA, EN, HD]) return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN return true unless tokens.first == VA @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)) end |
#meta_get_with_value_and_cas ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/dalli/protocol/response_processor.rb', line 39 def tokens = error_on_unexpected!([VA, EN, HD]) return [nil, 0] if tokens.first == EN cas = cas_from_tokens(tokens) return [nil, cas] unless tokens.first == VA [@value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)), cas] end |
#meta_get_without_value ⇒ Object
49 50 51 52 |
# File 'lib/dalli/protocol/response_processor.rb', line 49 def tokens = error_on_unexpected!([EN, HD]) tokens.first == EN ? nil : true end |
#meta_set_append_prepend ⇒ Object
96 97 98 99 100 101 |
# File 'lib/dalli/protocol/response_processor.rb', line 96 def tokens = error_on_unexpected!([HD, NS, NF, EX]) return false unless tokens.first == HD true end |
#meta_set_with_cas ⇒ Object
89 90 91 92 93 94 |
# File 'lib/dalli/protocol/response_processor.rb', line 89 def tokens = error_on_unexpected!([HD, NS, NF, EX]) return false unless tokens.first == HD cas_from_tokens(tokens) end |
#next_line_to_tokens ⇒ Object
246 247 248 249 |
# File 'lib/dalli/protocol/response_processor.rb', line 246 def next_line_to_tokens line = read_line line&.split || [] end |
#parse_value_from_tokens(tokens, cache_nils) ⇒ Object
82 83 84 85 86 87 |
# File 'lib/dalli/protocol/response_processor.rb', line 82 def parse_value_from_tokens(tokens, cache_nils) return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN return unless tokens.first == VA @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)) end |
#read_data(data_size) ⇒ Object
251 252 253 |
# File 'lib/dalli/protocol/response_processor.rb', line 251 def read_data(data_size) @io_source.read(data_size + TERMINATOR.bytesize)&.chomp!(TERMINATOR) end |
#read_line ⇒ Object
242 243 244 |
# File 'lib/dalli/protocol/response_processor.rb', line 242 def read_line @io_source.read_line&.chomp!(TERMINATOR) end |
#reset ⇒ Object
132 133 134 135 136 |
# File 'lib/dalli/protocol/response_processor.rb', line 132 def reset error_on_unexpected!([RESET]) true end |
#stats ⇒ Object
116 117 118 119 120 121 122 123 124 |
# File 'lib/dalli/protocol/response_processor.rb', line 116 def stats tokens = error_on_unexpected!([END_TOKEN, STAT]) values = {} while tokens.first != END_TOKEN values[tokens[1]] = tokens[2] tokens = next_line_to_tokens end values end |
#value_from_tokens(tokens, flag) ⇒ Object
235 236 237 238 239 240 |
# File 'lib/dalli/protocol/response_processor.rb', line 235 def value_from_tokens(tokens, flag) bitflags_token = tokens.find { |t| t.start_with?(flag) } return 0 unless bitflags_token bitflags_token[1..] end |
#version ⇒ Object
138 139 140 141 |
# File 'lib/dalli/protocol/response_processor.rb', line 138 def version tokens = error_on_unexpected!([VERSION]) tokens.last end |