Class: Dalli::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/dalli/server.rb

Defined Under Namespace

Modules: TCPSocketOptions Classes: NilObject

Constant Summary collapse

DEFAULT_PORT =
11211
DEFAULT_WEIGHT =
1
DEFAULTS =
{
  # seconds between trying to contact a remote server
  :down_retry_delay => 60,
  # connect/read/write timeout for socket operations
  :socket_timeout => 0.5,
  # times a socket operation may fail before considering the server dead
  :socket_max_failures => 2,
  # amount of time to sleep between retries when a failure occurs
  :socket_failure_delay => 0.01,
  # max size of value in bytes (default is 1 MB, can be overriden with "memcached -I <size>")
  :value_max_bytes => 1024 * 1024,
  # surpassing value_max_bytes either warns (false) or throws (true)
  :error_when_over_max_size => false,
  :compressor => Compressor,
  # min byte size to attempt compression
  :compression_min_size => 1024,
  # max byte size for compression
  :compression_max_size => false,
  :serializer => Marshal,
  :username => nil,
  :password => nil,
  :keepalive => true,
  # max byte size for SO_SNDBUF
  :sndbuf => nil,
  # max byte size for SO_RCVBUF
  :rcvbuf => nil
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attribs, options = {}) ⇒ Server

Returns a new instance of Server.



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/dalli/server.rb', line 44

def initialize(attribs, options = {})
  @hostname, @port, @weight, @socket_type = parse_hostname(attribs)
  @fail_count = 0
  @down_at = nil
  @last_down_at = nil
  @options = DEFAULTS.merge(options)
  @sock = nil
  @msg = nil
  @error = nil
  @pid = nil
  @inprogress = nil
end

Instance Attribute Details

#hostnameObject

Returns the value of attribute hostname.



7
8
9
# File 'lib/dalli/server.rb', line 7

def hostname
  @hostname
end

#optionsObject

Returns the value of attribute options.



10
11
12
# File 'lib/dalli/server.rb', line 10

def options
  @options
end

#portObject

Returns the value of attribute port.



8
9
10
# File 'lib/dalli/server.rb', line 8

def port
  @port
end

#sockObject (readonly)

Returns the value of attribute sock.



11
12
13
# File 'lib/dalli/server.rb', line 11

def sock
  @sock
end

#socket_typeObject (readonly)

possible values: :unix, :tcp



12
13
14
# File 'lib/dalli/server.rb', line 12

def socket_type
  @socket_type
end

#weightObject

Returns the value of attribute weight.



9
10
11
# File 'lib/dalli/server.rb', line 9

def weight
  @weight
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/dalli/server.rb', line 85

def alive?
  return true if @sock

  if @last_down_at && @last_down_at + options[:down_retry_delay] >= Time.now
    time = @last_down_at + options[:down_retry_delay] - Time.now
    Dalli.logger.debug { "down_retry_delay not reached for #{name} (%.3f seconds left)" % time }
    return false
  end

  connect
  !!@sock
rescue Dalli::NetworkError
  false
end

#closeObject



100
101
102
103
104
105
106
# File 'lib/dalli/server.rb', line 100

def close
  return unless @sock
  @sock.close rescue nil
  @sock = nil
  @pid = nil
  @inprogress = false
end

#compressorObject



118
119
120
# File 'lib/dalli/server.rb', line 118

def compressor
  @options[:compressor]
end

#lock!Object



108
109
# File 'lib/dalli/server.rb', line 108

def lock!
end

#multi_response_abortObject

Abort an earlier #multi_response_start. Used to signal an external timeout. The underlying socket is disconnected, and the exception is swallowed.

Returns nothing.



194
195
196
197
198
199
200
201
# File 'lib/dalli/server.rb', line 194

def multi_response_abort
  @multi_buffer = nil
  @position = nil
  @inprogress = false
  failure!(RuntimeError.new('External timeout'))
rescue NetworkError
  true
end

#multi_response_completed?Boolean

Did the last call to #multi_response_start complete successfully?

Returns:

  • (Boolean)


136
137
138
# File 'lib/dalli/server.rb', line 136

def multi_response_completed?
  @multi_buffer.nil?
end

#multi_response_nonblockObject

Attempt to receive and parse as many key/value pairs as possible from this server. After #multi_response_start, this should be invoked repeatedly whenever this server’s socket is readable until #multi_response_completed?.

Returns a Hash of kv pairs received.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/dalli/server.rb', line 146

def multi_response_nonblock
  raise 'multi_response has completed' if @multi_buffer.nil?

  @multi_buffer << @sock.read_available
  buf = @multi_buffer
  pos = @position
  values = {}

  while buf.bytesize - pos >= 24
    header = buf.slice(pos, 24)
    (key_length, _, body_length, cas) = header.unpack(KV_HEADER)

    if key_length == 0
      # all done!
      @multi_buffer = nil
      @position = nil
      @inprogress = false
      break

    elsif buf.bytesize - pos >= 24 + body_length
      flags = buf.slice(pos + 24, 4).unpack('N')[0]
      key = buf.slice(pos + 24 + 4, key_length)
      value = buf.slice(pos + 24 + 4 + key_length, body_length - key_length - 4) if body_length - key_length - 4 > 0

      pos = pos + 24 + body_length

      begin
        values[key] = [deserialize(value, flags), cas]
      rescue DalliError
      end

    else
      # not enough data yet, wait for more
      break
    end
  end
  @position = pos

  values
rescue SystemCallError, Timeout::Error, EOFError => e
  failure!(e)
end

#multi_response_startObject

Start reading key/value pairs from this connection. This is usually called after a series of GETKQ commands. A NOOP is sent, and the server begins flushing responses for kv pairs that were found.

Returns nothing.



127
128
129
130
131
132
133
# File 'lib/dalli/server.rb', line 127

def multi_response_start
  verify_state
  write_noop
  @multi_buffer = String.new('')
  @position = 0
  @inprogress = true
end

#nameObject



57
58
59
60
61
62
63
# File 'lib/dalli/server.rb', line 57

def name
  if socket_type == :unix
    hostname
  else
    "#{hostname}:#{port}"
  end
end

#request(op, *args) ⇒ Object

Chokepoint method for instrumentation



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/dalli/server.rb', line 66

def request(op, *args)
  verify_state
  raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive?
  begin
    send(op, *args)
  rescue Dalli::MarshalError => ex
    Dalli.logger.error "Marshalling error for key '#{args.first}': #{ex.message}"
    Dalli.logger.error "You are trying to cache a Ruby object which cannot be serialized to memcached."
    Dalli.logger.error ex.backtrace.join("\n\t")
    false
  rescue Dalli::DalliError, Dalli::NetworkError, Dalli::ValueOverMaxSize, Timeout::Error
    raise
  rescue => ex
    Dalli.logger.error "Unexpected exception during Dalli request: #{ex.class.name}: #{ex.message}"
    Dalli.logger.error ex.backtrace.join("\n\t")
    down!
  end
end

#serializerObject



114
115
116
# File 'lib/dalli/server.rb', line 114

def serializer
  @options[:serializer]
end

#unlock!Object



111
112
# File 'lib/dalli/server.rb', line 111

def unlock!
end