Class: Sensu::Redis::Client

Inherits:
EM::Connection
  • Object
show all
Includes:
EM::Deferrable, Utilities
Defined in:
lib/sensu/redis/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#ip_address?, #resolve_host, #resolve_hostname

Constructor Details

#initialize(options = {}) ⇒ Client

Initialize the connection, creating the Redis command methods, and setting the default connection options and callbacks.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/sensu/redis/client.rb', line 16

def initialize(options={})
  create_command_methods!
  @host = options[:host] || "127.0.0.1"
  @port = (options[:port] || 6379).to_i
  @db = options[:db]
  @password = options[:password]
  @auto_reconnect = options.fetch(:auto_reconnect, true)
  @reconnect_on_error = options.fetch(:reconnect_on_error, true)
  @error_callback = Proc.new {}
  @reconnect_callbacks = {
    :before => Proc.new {},
    :after => Proc.new {}
  }
end

Instance Attribute Details

#auto_reconnectObject

Returns the value of attribute auto_reconnect.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def auto_reconnect
  @auto_reconnect
end

#loggerObject

Returns the value of attribute logger.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def logger
  @logger
end

#reconnect_on_errorObject

Returns the value of attribute reconnect_on_error.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def reconnect_on_error
  @reconnect_on_error
end

#sentinelObject

Returns the value of attribute sentinel.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def sentinel
  @sentinel
end

Instance Method Details

#after_reconnect(&block) ⇒ Object

Set the connection after reconnect callback. This callback is called after a successful reconnect, after the connection has been validated.



63
64
65
# File 'lib/sensu/redis/client.rb', line 63

def after_reconnect(&block)
  @reconnect_callbacks[:after] = block
end

#authenticate { ... } ⇒ Object

Authenticate to Redis if a password has been set in the connection options. This method uses ‘send_command()` directly, as it assumes that the connection has been established. Redis authentication must be done prior to issuing other Redis commands.

Yields:

  • the callback called once authenticated.



230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/sensu/redis/client.rb', line 230

def authenticate
  if @password
    send_command(AUTH_COMMAND, @password) do |authenticated|
      if authenticated
        yield if block_given?
      else
        error(ConnectionError, "redis authenticate failed")
      end
    end
  else
    yield if block_given?
  end
end

#before_reconnect(&block) ⇒ Object

Set the connection before reconnect callback. This callback is called after the connection closes but before a reconnect is attempted.



56
57
58
# File 'lib/sensu/redis/client.rb', line 56

def before_reconnect(&block)
  @reconnect_callbacks[:before] = block
end

#begin_multibulk(multibulk_count) ⇒ Object

Begin a multi bulk response array for an expected number of responses. Using this method causes ‘dispatch_response()` to wait until all of the expected responses have been added to the array, before the Redis command reponse callback is called.

Parameters:

  • multibulk_count (Integer)

    number of expected responses.



290
291
292
293
# File 'lib/sensu/redis/client.rb', line 290

def begin_multibulk(multibulk_count)
  @multibulk_count = multibulk_count
  @multibulk_values = []
end

#closeObject

Close the Redis connection after writing the current Redis command data.



117
118
119
120
# File 'lib/sensu/redis/client.rb', line 117

def close
  @closing = true
  close_connection_after_writing
end

#connected?Boolean

Determine if the connection is connected to Redis.

Returns:

  • (Boolean)


83
84
85
# File 'lib/sensu/redis/client.rb', line 83

def connected?
  @connected || false
end

#connection_completedObject

This method is called by EM when the connection is established. This method is reponsible for validating the connection before Redis commands can be sent.



269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/sensu/redis/client.rb', line 269

def connection_completed
  @response_callbacks = []
  @multibulk_count = false
  @connected = true
  authenticate do
    select_db
    verify_version do
      succeed
      @reconnect_callbacks[:after].call if @reconnecting
      @reconnecting = false
    end
  end
end

#create_command_methods!Object

Create Redis command methods. Command methods wrap ‘redis_command()`. This method is called by `initialize()`.



183
184
185
186
187
188
189
# File 'lib/sensu/redis/client.rb', line 183

def create_command_methods!
  COMMANDS.each do |command|
    self.class.send(:define_method, command.to_sym) do |*arguments, &block|
      redis_command(command, *arguments, &block)
    end
  end
end

#determine_address { ... } ⇒ Object

Determine the current Redis master address. If Sentinel was used to determine the original address, use it again. If Sentinel is not being used, return the host and port used when the connection was first established.

Yields:

  • callback called when the current Redis master host and port has been determined.



38
39
40
41
42
43
44
# File 'lib/sensu/redis/client.rb', line 38

def determine_address(&block)
  if @sentinel
    @sentinel.resolve(&block)
  else
    block.call(@host, @port)
  end
end

#dispatch_error(code) ⇒ Object

Dispatch a Redis error, dropping the associated Redis command response callback, and passing a Redis error object to the error callback (if set).

Parameters:

  • code (String)

    Redis error code.



300
301
302
303
# File 'lib/sensu/redis/client.rb', line 300

def dispatch_error(code)
  @response_callbacks.shift
  error(CommandError, code)
end

#dispatch_response(value) ⇒ Object

Dispatch a response. If a multi bulk response has begun, this method will build the completed response array before the associated Redis command response callback is called. If one or more pubsub callbacks are defined, the approprate pubsub callbacks are called, provided with the pubsub response. Redis command response callbacks may have an optional processor block, responsible for producing a value with the correct type, e.g. “1” -> true (boolean).

Parameters:

  • value (Object)


315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/sensu/redis/client.rb', line 315

def dispatch_response(value)
  if @multibulk_count
    @multibulk_values << value
    @multibulk_count -= 1
    if @multibulk_count == 0
      value = @multibulk_values
      @multibulk_count = false
    else
      return
    end
  end
  if @pubsub_callbacks && value.is_a?(Array)
    if PUBSUB_RESPONSES.include?(value[0])
      @pubsub_callbacks[value[1]].each do |block|
        block.call(*value) if block
      end
      return
    end
  end
  processor, block = @response_callbacks.shift
  if block
    value = processor.call(value) if processor
    block.call(value)
  end
end

#error(klass, message) ⇒ Object

Create an error and pass it to the connection error callback. This method will close the current connection and trigger a reconnect (via ‘unbind()`) if `@reconnect_on_error` is `true`. Closing the connection here is necessary to stop EventMachine from reusing the same connection handler (we want a fresh Redis connection).

Parameters:

  • klass (Class)
  • message (String)


76
77
78
79
80
# File 'lib/sensu/redis/client.rb', line 76

def error(klass, message)
  redis_error = klass.new(message)
  @error_callback.call(redis_error)
  close_connection if @reconnect_on_error
end

#on_error(&block) ⇒ Object

Set the connection error callback. This callback is called when the connection encounters either a connection, protocol, or command error.



49
50
51
# File 'lib/sensu/redis/client.rb', line 49

def on_error(&block)
  @error_callback = block
end

#parse_response_line(line) ⇒ Object

Parse a RESP line. This method is called by ‘receive_data()`. You can read about RESP @ redis.io/topics/protocol

Parameters:

  • line (String)


345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/sensu/redis/client.rb', line 345

def parse_response_line(line)
  # Trim off the response type and delimiter (\r\n).
  response = line.slice(1..-3)
  # First character indicates response type.
  case line[0, 1]
  when MINUS # Error, e.g. -ERR
    dispatch_error(response)
  when PLUS # String, e.g. +OK
    dispatch_response(response)
  when DOLLAR # Bulk string, e.g. $3\r\nfoo\r\n
    response_length = Integer(response)
    if response_length == -1 # No data, return nil.
      dispatch_response(nil)
    elsif @buffer.length >= response_length + 2 # Complete data.
      dispatch_response(@buffer.slice!(0, response_length))
      @buffer.slice!(0,2) # Discard delimeter (\r\n).
    else # Incomplete, have data pushed back into buffer.
      return INCOMPLETE
    end
  when COLON # Integer, e.g. :8
    dispatch_response(Integer(response))
  when ASTERISK # Array, e.g. *2\r\n$3\r\foo\r\n$3\r\nbar\r\n
    multibulk_count = Integer(response)
    if multibulk_count == -1 || multibulk_count == 0 # No data, return [].
      dispatch_response([])
    else
      begin_multibulk(multibulk_count) # Accumulate responses.
    end
  else
    error(ProtocolError, "response type not recognized: #{line.strip}")
  end
end

#receive_data(data) ⇒ Object

This method is called by EM when the connection receives data. This method assumes that the incoming data is using RESP and it is parsed by ‘parse_resp_line()`.

Parameters:

  • data (String)


383
384
385
386
387
388
389
390
391
392
# File 'lib/sensu/redis/client.rb', line 383

def receive_data(data)
  (@buffer ||= '') << data
  while index = @buffer.index(DELIM)
    line = @buffer.slice!(0, index+2)
    if parse_response_line(line) == INCOMPLETE
      @buffer[0...0] = line
      break
    end
  end
end

#reconnect!Object

Reconnect to Redis. The before reconnect callback is first called if not already reconnecting. This method uses a 1 second delay before attempting a reconnect. The method ‘determine_address()` is used to determine the correct host and port to reconnect to, either via Sentinel (new master) or the previous host and port. This method uses `resolve_host()` to first resolve the determined host, if it’s not already an IP address. Resolving the hostname upfront guards against lookup failures that would cause the Sensu process to crash. Upfront hostname resolution also allows this Redis library to work with Amazon AWS ElastiCache & where DNS is used as a failover mechanism.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/sensu/redis/client.rb', line 99

def reconnect!
  @reconnect_callbacks[:before].call unless @reconnecting
  @reconnecting = true
  EM.add_timer(1) do
    determine_address do |host, port|
      resolve_host(host) do |ip_address|
        if ip_address.nil?
          reconnect!
        else
          reconnect(ip_address, port.to_i)
        end
      end
    end
  end
end

#redis_command(command, *arguments) { ... } ⇒ Object

Send a Redis command once the Redis connection has been established (EM Deferable succeeded).

Parameters:

  • command (String)
  • *arguments (Array<Object>)

Yields:

  • command reponse callback



171
172
173
174
175
176
177
178
179
# File 'lib/sensu/redis/client.rb', line 171

def redis_command(command, *arguments, &block)
  if @deferred_status == :succeeded
    send_command(command, *arguments, &block)
  else
    callback do
      send_command(command, *arguments, &block)
    end
  end
end

#select_dbObject

Select a Redis DB if a DB has been set in the connection options. This method (& Redis command) does not require a response callback.



247
248
249
# File 'lib/sensu/redis/client.rb', line 247

def select_db
  send_command(SELECT_COMMAND, @db) if @db
end

#send_command(command, *arguments) { ... } ⇒ Object

Send a Redis command and queue the associated response callback. This method calls ‘send_command_data()` for RESP multi bulk and transmission.

Parameters:

  • command (String)
  • *arguments (Array<Object>)

Yields:

  • command reponse callback



160
161
162
163
# File 'lib/sensu/redis/client.rb', line 160

def send_command(command, *arguments, &block)
  send_command_data(command, *arguments)
  @response_callbacks << [RESPONSE_PROCESSORS[command], block]
end

#send_command_data(*arguments) ⇒ Object

Send a Redis command using RESP multi bulk. This method sends data to Redis using EM connection ‘send_data()`.

Parameters:

  • *arguments (Array<Object>)


144
145
146
147
148
149
150
151
# File 'lib/sensu/redis/client.rb', line 144

def send_command_data(*arguments)
  data = "*#{arguments.length}#{DELIM}"
  arguments.each do |value|
    value = value.to_s
    data << "$#{value.bytesize}#{DELIM}#{value}#{DELIM}"
  end
  send_data(data)
end

#subscribe(channel) { ... } ⇒ Object

Subscribe to a Redis PubSub channel.

Parameters:

  • channel (String)

Yields:

  • channel message callback.



195
196
197
198
199
200
# File 'lib/sensu/redis/client.rb', line 195

def subscribe(channel, &block)
  @pubsub_callbacks ||= {}
  @pubsub_callbacks[channel] ||= []
  @pubsub_callbacks[channel] << block
  redis_command(SUBSCRIBE_COMMAND, channel, &block)
end

#unbindObject

This method is called by EM when the connection closes, either intentionally or unexpectedly. This method is reponsible for starting the reconnect process when appropriate.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/sensu/redis/client.rb', line 125

def unbind
  @deferred_status = nil
  @pubsub_callbacks = nil
  if @closing
    @reconnecting = false
  elsif ((@connected || @reconnecting) && @auto_reconnect) || @reconnect_on_error
    reconnect!
  elsif @connected
    error(ConnectionError, "connection closed")
  else
    error(ConnectionError, "unable to connect to redis server")
  end
  @connected = false
end

#unsubscribe(channel = nil) { ... } ⇒ Object

Unsubscribe to one or more Redis PubSub channels. If a channel is provided, this method will unsubscribe from it. If a channel is not provided, this method will unsubscribe from all Redis PubSub channels.

Parameters:

  • channel (String) (defaults to: nil)

Yields:

  • unsubscribe callback.



209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/sensu/redis/client.rb', line 209

def unsubscribe(channel=nil, &block)
  @pubsub_callbacks ||= {}
  arguments = [UNSUBSCRIBE_COMMAND]
  if channel
    @pubsub_callbacks[channel] = [block]
    arguments << channel
  else
    @pubsub_callbacks.each_key do |key|
      @pubsub_callbacks[key] = [block]
    end
  end
  redis_command(arguments)
end

#verify_version { ... } ⇒ Object

Verify the version of Redis. Redis >= 2.0 RC 1 is required for certain Redis commands that Sensu uses. A connection error is created if the Redis version does not meet the requirements.

Yields:

  • the callback called once verified.



256
257
258
259
260
261
262
263
264
# File 'lib/sensu/redis/client.rb', line 256

def verify_version
  send_command(INFO_COMMAND) do |redis_info|
    if redis_info[:redis_version] < "1.3.14"
      error(ConnectionError, "redis version must be >= 2.0 RC 1")
    else
      yield if block_given?
    end
  end
end