Class: Sensu::Redis::Client

Inherits:
EM::Connection
  • Object
show all
Includes:
EM::Deferrable, Utilities
Defined in:
lib/sensu/redis/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#ip_address?, #resolve_host, #resolve_hostname

Constructor Details

#initialize(options = {}) ⇒ Client

Initialize the connection, creating the Redis command methods, and setting the default connection options and callbacks.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/sensu/redis/client.rb', line 16

def initialize(options={})
  create_command_methods!
  @host = options[:host] || "127.0.0.1"
  @port = (options[:port] || 6379).to_i
  @db = options[:db]
  @password = options[:password]
  @tls = options[:tls] || options[:ssl]
  @auto_reconnect = options.fetch(:auto_reconnect, true)
  @reconnect_on_error = options.fetch(:reconnect_on_error, true)
  @error_callback = Proc.new {}
  @reconnect_callbacks = {
    :before => Proc.new {},
    :after => Proc.new {}
  }
end

Instance Attribute Details

#auto_reconnectObject

Returns the value of attribute auto_reconnect.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def auto_reconnect
  @auto_reconnect
end

#loggerObject

Returns the value of attribute logger.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def logger
  @logger
end

#reconnect_on_errorObject

Returns the value of attribute reconnect_on_error.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def reconnect_on_error
  @reconnect_on_error
end

#sentinelObject

Returns the value of attribute sentinel.



12
13
14
# File 'lib/sensu/redis/client.rb', line 12

def sentinel
  @sentinel
end

Instance Method Details

#after_reconnect(&block) ⇒ Object

Set the connection after reconnect callback. This callback is called after a successful reconnect, after the connection has been validated.



64
65
66
# File 'lib/sensu/redis/client.rb', line 64

def after_reconnect(&block)
  @reconnect_callbacks[:after] = block
end

#authenticate { ... } ⇒ Object

Authenticate to Redis if a password has been set in the connection options. This method uses ‘send_command()` directly, as it assumes that the connection has been established. Redis authentication must be done prior to issuing other Redis commands.

Yields:

  • the callback called once authenticated.



231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/sensu/redis/client.rb', line 231

def authenticate
  if @password
    send_command(AUTH_COMMAND, @password) do |authenticated|
      if authenticated
        yield if block_given?
      else
        error(ConnectionError, "redis authenticate failed")
      end
    end
  else
    yield if block_given?
  end
end

#before_reconnect(&block) ⇒ Object

Set the connection before reconnect callback. This callback is called after the connection closes but before a reconnect is attempted.



57
58
59
# File 'lib/sensu/redis/client.rb', line 57

def before_reconnect(&block)
  @reconnect_callbacks[:before] = block
end

#begin_multibulk(multibulk_count) ⇒ Object

Begin a multi bulk response array for an expected number of responses. Using this method causes ‘dispatch_response()` to wait until all of the expected responses have been added to the array, before the Redis command reponse callback is called.

Parameters:

  • multibulk_count (Integer)

    number of expected responses.



307
308
309
310
# File 'lib/sensu/redis/client.rb', line 307

def begin_multibulk(multibulk_count)
  @multibulk_count = multibulk_count
  @multibulk_values = []
end

#closeObject

Close the Redis connection after writing the current Redis command data.



118
119
120
121
# File 'lib/sensu/redis/client.rb', line 118

def close
  @closing = true
  close_connection_after_writing
end

#connected?Boolean

Determine if the connection is connected to Redis.

Returns:

  • (Boolean)


84
85
86
# File 'lib/sensu/redis/client.rb', line 84

def connected?
  @connected || false
end

#connection_completedObject

This method is called by EM when the connection is established. This method is reponsible for upgrading to TLS if necessary and validating the connection before Redis commands can be sent.



271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/sensu/redis/client.rb', line 271

def connection_completed
  @response_callbacks = []
  @multibulk_count = false
  @connected = true
  start_tls(@tls) unless @tls.nil?
  authenticate do
    select_db
    verify_version do
      succeed
      @reconnect_callbacks[:after].call if @reconnecting
      @reconnecting = false
    end
  end
end

#create_command_methods!Object

Create Redis command methods. Command methods wrap ‘redis_command()`. This method is called by `initialize()`.



184
185
186
187
188
189
190
# File 'lib/sensu/redis/client.rb', line 184

def create_command_methods!
  COMMANDS.each do |command|
    self.class.send(:define_method, command.to_sym) do |*arguments, &block|
      redis_command(command, *arguments, &block)
    end
  end
end

#determine_address { ... } ⇒ Object

Determine the current Redis master address. If Sentinel was used to determine the original address, use it again. If Sentinel is not being used, return the host and port used when the connection was first established.

Yields:

  • callback called when the current Redis master host and port has been determined.



39
40
41
42
43
44
45
# File 'lib/sensu/redis/client.rb', line 39

def determine_address(&block)
  if @sentinel
    @sentinel.resolve(&block)
  else
    block.call(@host, @port)
  end
end

#dispatch_error(code) ⇒ Object

Dispatch a Redis error, dropping the associated Redis command response callback, and passing a Redis error object to the error callback (if set).

Parameters:

  • code (String)

    Redis error code.



317
318
319
320
# File 'lib/sensu/redis/client.rb', line 317

def dispatch_error(code)
  @response_callbacks.shift
  error(CommandError, code)
end

#dispatch_response(value) ⇒ Object

Dispatch a response. If a multi bulk response has begun, this method will build the completed response array before the associated Redis command response callback is called. If one or more pubsub callbacks are defined, the approprate pubsub callbacks are called, provided with the pubsub response. Redis command response callbacks may have an optional processor block, responsible for producing a value with the correct type, e.g. “1” -> true (boolean).

Parameters:

  • value (Object)


332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/sensu/redis/client.rb', line 332

def dispatch_response(value)
  if @multibulk_count
    @multibulk_values << value
    @multibulk_count -= 1
    if @multibulk_count == 0
      value = @multibulk_values
      @multibulk_count = false
    else
      return
    end
  end
  if @pubsub_callbacks && value.is_a?(Array)
    if PUBSUB_RESPONSES.include?(value[0])
      @pubsub_callbacks[value[1]].each do |block|
        block.call(*value) if block
      end
      return
    end
  end
  processor, block = @response_callbacks.shift
  if block
    value = processor.call(value) if processor
    block.call(value)
  end
end

#error(klass, message) ⇒ Object

Create an error and pass it to the connection error callback. This method will close the current connection and trigger a reconnect (via ‘unbind()`) if `@reconnect_on_error` is `true`. Closing the connection here is necessary to stop EventMachine from reusing the same connection handler (we want a fresh Redis connection).

Parameters:

  • klass (Class)
  • message (String)


77
78
79
80
81
# File 'lib/sensu/redis/client.rb', line 77

def error(klass, message)
  redis_error = klass.new(message)
  @error_callback.call(redis_error)
  close_connection if @reconnect_on_error
end

#on_error(&block) ⇒ Object

Set the connection error callback. This callback is called when the connection encounters either a connection, protocol, or command error.



50
51
52
# File 'lib/sensu/redis/client.rb', line 50

def on_error(&block)
  @error_callback = block
end

#parse_response_line(line) ⇒ Object

Parse a RESP line. This method is called by ‘receive_data()`. You can read about RESP @ redis.io/topics/protocol

Parameters:

  • line (String)


362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/sensu/redis/client.rb', line 362

def parse_response_line(line)
  # Trim off the response type and delimiter (\r\n).
  response = line.slice(1..-3)
  # First character indicates response type.
  case line[0, 1]
  when MINUS # Error, e.g. -ERR
    dispatch_error(response)
  when PLUS # String, e.g. +OK
    dispatch_response(response)
  when DOLLAR # Bulk string, e.g. $3\r\nfoo\r\n
    response_length = Integer(response)
    if response_length == -1 # No data, return nil.
      dispatch_response(nil)
    elsif @buffer.length >= response_length + 2 # Complete data.
      dispatch_response(@buffer.slice!(0, response_length))
      @buffer.slice!(0,2) # Discard delimeter (\r\n).
    else # Incomplete, have data pushed back into buffer.
      return INCOMPLETE
    end
  when COLON # Integer, e.g. :8
    dispatch_response(Integer(response))
  when ASTERISK # Array, e.g. *2\r\n$3\r\foo\r\n$3\r\nbar\r\n
    multibulk_count = Integer(response)
    if multibulk_count == -1 || multibulk_count == 0 # No data, return [].
      dispatch_response([])
    else
      begin_multibulk(multibulk_count) # Accumulate responses.
    end
  else
    error(ProtocolError, "response type not recognized: #{line.strip}")
  end
end

#receive_data(data) ⇒ Object

This method is called by EM when the connection receives data. This method assumes that the incoming data is using RESP and it is parsed by ‘parse_resp_line()`.

Parameters:

  • data (String)


400
401
402
403
404
405
406
407
408
409
# File 'lib/sensu/redis/client.rb', line 400

def receive_data(data)
  (@buffer ||= '') << data
  while index = @buffer.index(DELIM)
    line = @buffer.slice!(0, index+2)
    if parse_response_line(line) == INCOMPLETE
      @buffer[0...0] = line
      break
    end
  end
end

#reconnect!Object

Reconnect to Redis. The before reconnect callback is first called if not already reconnecting. This method uses a 1 second delay before attempting a reconnect. The method ‘determine_address()` is used to determine the correct host and port to reconnect to, either via Sentinel (new master) or the previous host and port. This method uses `resolve_host()` to first resolve the determined host, if it’s not already an IP address. Resolving the hostname upfront guards against lookup failures that would cause the Sensu process to crash. Upfront hostname resolution also allows this Redis library to work with Amazon AWS ElastiCache & where DNS is used as a failover mechanism.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/sensu/redis/client.rb', line 100

def reconnect!
  @reconnect_callbacks[:before].call unless @reconnecting
  @reconnecting = true
  EM.add_timer(1) do
    determine_address do |host, port|
      resolve_host(host) do |ip_address|
        if ip_address.nil?
          reconnect!
        else
          reconnect(ip_address, port.to_i)
        end
      end
    end
  end
end

#redis_command(command, *arguments) { ... } ⇒ Object

Send a Redis command once the Redis connection has been established (EM Deferable succeeded).

Parameters:

  • command (String)
  • *arguments (Array<Object>)

Yields:

  • command reponse callback



172
173
174
175
176
177
178
179
180
# File 'lib/sensu/redis/client.rb', line 172

def redis_command(command, *arguments, &block)
  if @deferred_status == :succeeded
    send_command(command, *arguments, &block)
  else
    callback do
      send_command(command, *arguments, &block)
    end
  end
end

#select_dbObject

Select a Redis DB if a DB has been set in the connection options. This method (& Redis command) does not require a response callback.



248
249
250
# File 'lib/sensu/redis/client.rb', line 248

def select_db
  send_command(SELECT_COMMAND, @db) if @db
end

#send_command(command, *arguments) { ... } ⇒ Object

Send a Redis command and queue the associated response callback. This method calls ‘send_command_data()` for RESP multi bulk and transmission.

Parameters:

  • command (String)
  • *arguments (Array<Object>)

Yields:

  • command reponse callback



161
162
163
164
# File 'lib/sensu/redis/client.rb', line 161

def send_command(command, *arguments, &block)
  send_command_data(command, *arguments)
  @response_callbacks << [RESPONSE_PROCESSORS[command], block]
end

#send_command_data(*arguments) ⇒ Object

Send a Redis command using RESP multi bulk. This method sends data to Redis using EM connection ‘send_data()`.

Parameters:

  • *arguments (Array<Object>)


145
146
147
148
149
150
151
152
# File 'lib/sensu/redis/client.rb', line 145

def send_command_data(*arguments)
  data = "*#{arguments.length}#{DELIM}"
  arguments.each do |value|
    value = value.to_s
    data << "$#{value.bytesize}#{DELIM}#{value}#{DELIM}"
  end
  send_data(data)
end

#ssl_handshake_completedObject

This method is called by EM when the SSL/TLS handshake has been completed, as a result of calling #start_tls to initiate SSL/TLS on the connection. Log when the TLS handshake is complete.



290
291
292
293
294
295
296
297
298
# File 'lib/sensu/redis/client.rb', line 290

def ssl_handshake_completed
  if @logger
    @logger.debug("redis tls handshake complete", {
      :host => @host,
      :port => @port,
      :tls => @tls
    })
  end
end

#subscribe(channel) { ... } ⇒ Object

Subscribe to a Redis PubSub channel.

Parameters:

  • channel (String)

Yields:

  • channel message callback.



196
197
198
199
200
201
# File 'lib/sensu/redis/client.rb', line 196

def subscribe(channel, &block)
  @pubsub_callbacks ||= {}
  @pubsub_callbacks[channel] ||= []
  @pubsub_callbacks[channel] << block
  redis_command(SUBSCRIBE_COMMAND, channel, &block)
end

#unbindObject

This method is called by EM when the connection closes, either intentionally or unexpectedly. This method is reponsible for starting the reconnect process when appropriate.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/sensu/redis/client.rb', line 126

def unbind
  @deferred_status = nil
  @pubsub_callbacks = nil
  if @closing
    @reconnecting = false
  elsif ((@connected || @reconnecting) && @auto_reconnect) || @reconnect_on_error
    reconnect!
  elsif @connected
    error(ConnectionError, "connection closed")
  else
    error(ConnectionError, "unable to connect to redis server")
  end
  @connected = false
end

#unsubscribe(channel = nil) { ... } ⇒ Object

Unsubscribe to one or more Redis PubSub channels. If a channel is provided, this method will unsubscribe from it. If a channel is not provided, this method will unsubscribe from all Redis PubSub channels.

Parameters:

  • channel (String) (defaults to: nil)

Yields:

  • unsubscribe callback.



210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/sensu/redis/client.rb', line 210

def unsubscribe(channel=nil, &block)
  @pubsub_callbacks ||= {}
  arguments = [UNSUBSCRIBE_COMMAND]
  if channel
    @pubsub_callbacks[channel] = [block]
    arguments << channel
  else
    @pubsub_callbacks.each_key do |key|
      @pubsub_callbacks[key] = [block]
    end
  end
  redis_command(arguments)
end

#verify_version { ... } ⇒ Object

Verify the version of Redis. Redis >= 2.0 RC 1 is required for certain Redis commands that Sensu uses. A connection error is created if the Redis version does not meet the requirements.

Yields:

  • the callback called once verified.



257
258
259
260
261
262
263
264
265
# File 'lib/sensu/redis/client.rb', line 257

def verify_version
  send_command(INFO_COMMAND) do |redis_info|
    if redis_info[:redis_version] < "1.3.14"
      error(ConnectionError, "redis version must be >= 2.0 RC 1")
    else
      yield if block_given?
    end
  end
end