Class: Kestrel::Client

Inherits:
Memcached
  • Object
show all
Includes:
RetryHelper, StatsHelper
Defined in:
lib/kestrel/client.rb,
lib/kestrel/client/json.rb,
lib/kestrel/client/proxy.rb,
lib/kestrel/client/blocking.rb,
lib/kestrel/client/envelope.rb,
lib/kestrel/client/namespace.rb,
lib/kestrel/client/unmarshal.rb,
lib/kestrel/client/partitioning.rb

Defined Under Namespace

Modules: RetryHelper, StatsHelper Classes: Blocking, Envelope, Json, Namespace, Partitioning, Proxy, Transactional, Unmarshal

Constant Summary collapse

KESTREL_OPTIONS =
[:gets_per_server, :exception_retry_limit, :get_timeout_ms].freeze
DEFAULT_OPTIONS =
{
  :retry_timeout => 0,
  :exception_retry_limit => 5,
  :timeout => 0.25,
  :gets_per_server => 100,
  :get_timeout_ms => 10
}.freeze
RECOVERABLE_ERRORS =

Exceptions which are connection failures we retry after

[
  Memcached::ServerIsMarkedDead,
  Memcached::ATimeoutOccurred,
  Memcached::ConnectionBindFailure,
  Memcached::ConnectionFailure,
  Memcached::ConnectionSocketCreateFailure,
  Memcached::Failure,
  Memcached::MemoryAllocationFailure,
  Memcached::ReadFailure,
  Memcached::ServerError,
  Memcached::SystemError,
  Memcached::UnknownReadFailure,
  Memcached::WriteFailure
]

Constants included from StatsHelper

StatsHelper::QUEUE_STAT_NAMES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from StatsHelper

#available_queues, #sizeof, #stat, #stats

Constructor Details

#initialize(*servers) ⇒ Client

Returns a new instance of Client.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/kestrel/client.rb', line 45

def initialize(*servers)
  opts = servers.last.is_a?(Hash) ? servers.pop : {}
  opts = DEFAULT_OPTIONS.merge(opts)

  @kestrel_options = extract_kestrel_options!(opts)
  @default_get_timeout = kestrel_options[:get_timeout_ms]
  @gets_per_server = kestrel_options[:gets_per_server]
  @exception_retry_limit = kestrel_options[:exception_retry_limit]
  @counter = 0

  # we handle our own retries so that we can apply different
  # policies to sets and gets, so set memcached limit to 0
  opts[:exception_retry_limit] = 0
  opts[:distribution] = :random # force random distribution

  super Array(servers).flatten.compact, opts
end

Instance Attribute Details

#current_queueObject (readonly)

Returns the value of attribute current_queue.



64
65
66
# File 'lib/kestrel/client.rb', line 64

def current_queue
  @current_queue
end

#kestrel_optionsObject (readonly)

Returns the value of attribute kestrel_options.



64
65
66
# File 'lib/kestrel/client.rb', line 64

def kestrel_options
  @kestrel_options
end

Instance Method Details

#delete(key, expiry = 0) ⇒ Object

end ifdef :)



100
101
102
103
# File 'lib/kestrel/client.rb', line 100

def delete(key, expiry=0)
  with_retries { super key }
rescue Memcached::NotFound, Memcached::ServerEnd
end

#flush(queue) ⇒ Object



147
148
149
150
151
152
153
# File 'lib/kestrel/client.rb', line 147

def flush(queue)
  count = 0
  while sizeof(queue) > 0
    count += 1 while get queue, :raw => true
  end
  count
end

#get(key, opts = {}) ⇒ Object

Parameters

key<String>

Queue name

opts<Boolean,Hash>

True/false toggles Marshalling. A Hash allows collision-avoiding options support.

Options (opts)

:open<Boolean>

Begins a transactional read.

:close<Boolean>

Ends a transactional read.

:abort<Boolean>

Cancels an existing transactional read

:peek<Boolean>

Return the head of the queue, without removal

:timeout<Integer>

Milliseconds to block for a new item

:raw<Boolean>

Toggles Marshalling. Equivalent to the “old style” second argument.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/kestrel/client.rb', line 127

def get(key, opts = {})
  raw = opts.delete(:raw) || false
  commands = extract_queue_commands(opts)

  val =
    begin
      send(select_get_method(key), key + commands, raw)
    rescue *RECOVERABLE_ERRORS
      # we can't tell the difference between a server being down
      # and an empty queue, so just return nil. our sticky server
      # logic should eliminate piling on down servers
      nil
    end

  # nil result, force next get to jump from current server
  @counter = @gets_per_server unless val

  val
end

#get_from_last(key, raw = false) ⇒ Object



85
86
87
88
# File 'lib/kestrel/client.rb', line 85

def get_from_last(key, raw=false)
  super key, !raw
rescue Memcached::NotFound
end

#get_from_random(key, raw = false) ⇒ Object



76
77
78
79
# File 'lib/kestrel/client.rb', line 76

def get_from_random(key, raw=false)
  _super_get_from_random key, !raw
rescue Memcached::NotFound
end

#peek(queue) ⇒ Object



155
156
157
# File 'lib/kestrel/client.rb', line 155

def peek(queue)
  get queue, :peek => true
end

#set(key, value, ttl = 0, raw = false) ⇒ Object



105
106
107
108
109
110
# File 'lib/kestrel/client.rb', line 105

def set(key, value, ttl=0, raw=false)
  with_retries { super key, value, ttl, !raw }
  true
rescue Memcached::NotStored
  false
end