Class: Sensu::Redis::Client
- Inherits:
-
EM::Connection
- Object
- EM::Connection
- Sensu::Redis::Client
- Includes:
- EM::Deferrable, Utilities
- Defined in:
- lib/sensu/redis/client.rb
Instance Attribute Summary collapse
-
#auto_reconnect ⇒ Object
Returns the value of attribute auto_reconnect.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#reconnect_on_error ⇒ Object
Returns the value of attribute reconnect_on_error.
-
#sentinel ⇒ Object
Returns the value of attribute sentinel.
Instance Method Summary collapse
-
#after_reconnect(&block) ⇒ Object
Set the connection after reconnect callback.
-
#authenticate { ... } ⇒ Object
Authenticate to Redis if a password has been set in the connection options.
-
#before_reconnect(&block) ⇒ Object
Set the connection before reconnect callback.
-
#begin_multibulk(multibulk_count) ⇒ Object
Begin a multi bulk response array for an expected number of responses.
-
#close ⇒ Object
Close the Redis connection after writing the current Redis command data.
-
#connected? ⇒ Boolean
Determine if the connection is connected to Redis.
-
#connection_completed ⇒ Object
This method is called by EM when the connection is established.
-
#create_command_methods! ⇒ Object
Create Redis command methods.
-
#determine_address { ... } ⇒ Object
Determine the current Redis master address.
-
#dispatch_error(code) ⇒ Object
Dispatch a Redis error, dropping the associated Redis command response callback, and passing a Redis error object to the error callback (if set).
-
#dispatch_response(value) ⇒ Object
Dispatch a response.
-
#error(klass, message) ⇒ Object
Create an error and pass it to the connection error callback.
-
#initialize(options = {}) ⇒ Client
constructor
Initialize the connection, creating the Redis command methods, and setting the default connection options and callbacks.
-
#on_error(&block) ⇒ Object
Set the connection error callback.
-
#parse_response_line(line) ⇒ Object
Parse a RESP line.
-
#receive_data(data) ⇒ Object
This method is called by EM when the connection receives data.
-
#reconnect! ⇒ Object
Reconnect to Redis.
-
#redis_command(command, *arguments) { ... } ⇒ Object
Send a Redis command once the Redis connection has been established (EM Deferable succeeded).
-
#select_db ⇒ Object
Select a Redis DB if a DB has been set in the connection options.
-
#send_command(command, *arguments) { ... } ⇒ Object
Send a Redis command and queue the associated response callback.
-
#send_command_data(*arguments) ⇒ Object
Send a Redis command using RESP multi bulk.
-
#ssl_handshake_completed ⇒ Object
This method is called by EM when the SSL/TLS handshake has been completed, as a result of calling #start_tls to initiate SSL/TLS on the connection.
-
#subscribe(channel) { ... } ⇒ Object
Subscribe to a Redis PubSub channel.
-
#unbind ⇒ Object
This method is called by EM when the connection closes, either intentionally or unexpectedly.
-
#unsubscribe(channel = nil) { ... } ⇒ Object
Unsubscribe to one or more Redis PubSub channels.
-
#verify_version { ... } ⇒ Object
Verify the version of Redis.
Methods included from Utilities
#ip_address?, #resolve_host, #resolve_hostname
Constructor Details
#initialize(options = {}) ⇒ Client
Initialize the connection, creating the Redis command methods, and setting the default connection options and callbacks.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/sensu/redis/client.rb', line 16 def initialize(={}) create_command_methods! @host = [:host] || "127.0.0.1" @port = ([:port] || 6379).to_i @db = [:db] @password = [:password] @tls = [:tls] || [:ssl] @auto_reconnect = .fetch(:auto_reconnect, true) @reconnect_on_error = .fetch(:reconnect_on_error, true) @error_callback = Proc.new {} @reconnect_callbacks = { :before => Proc.new {}, :after => Proc.new {} } end |
Instance Attribute Details
#auto_reconnect ⇒ Object
Returns the value of attribute auto_reconnect.
12 13 14 |
# File 'lib/sensu/redis/client.rb', line 12 def auto_reconnect @auto_reconnect end |
#logger ⇒ Object
Returns the value of attribute logger.
12 13 14 |
# File 'lib/sensu/redis/client.rb', line 12 def logger @logger end |
#reconnect_on_error ⇒ Object
Returns the value of attribute reconnect_on_error.
12 13 14 |
# File 'lib/sensu/redis/client.rb', line 12 def reconnect_on_error @reconnect_on_error end |
#sentinel ⇒ Object
Returns the value of attribute sentinel.
12 13 14 |
# File 'lib/sensu/redis/client.rb', line 12 def sentinel @sentinel end |
Instance Method Details
#after_reconnect(&block) ⇒ Object
Set the connection after reconnect callback. This callback is called after a successful reconnect, after the connection has been validated.
64 65 66 |
# File 'lib/sensu/redis/client.rb', line 64 def after_reconnect(&block) @reconnect_callbacks[:after] = block end |
#authenticate { ... } ⇒ Object
Authenticate to Redis if a password has been set in the connection options. This method uses ‘send_command()` directly, as it assumes that the connection has been established. Redis authentication must be done prior to issuing other Redis commands.
231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/sensu/redis/client.rb', line 231 def authenticate if @password send_command(AUTH_COMMAND, @password) do |authenticated| if authenticated yield if block_given? else error(ConnectionError, "redis authenticate failed") end end else yield if block_given? end end |
#before_reconnect(&block) ⇒ Object
Set the connection before reconnect callback. This callback is called after the connection closes but before a reconnect is attempted.
57 58 59 |
# File 'lib/sensu/redis/client.rb', line 57 def before_reconnect(&block) @reconnect_callbacks[:before] = block end |
#begin_multibulk(multibulk_count) ⇒ Object
Begin a multi bulk response array for an expected number of responses. Using this method causes ‘dispatch_response()` to wait until all of the expected responses have been added to the array, before the Redis command reponse callback is called.
307 308 309 310 |
# File 'lib/sensu/redis/client.rb', line 307 def begin_multibulk(multibulk_count) @multibulk_count = multibulk_count @multibulk_values = [] end |
#close ⇒ Object
Close the Redis connection after writing the current Redis command data.
118 119 120 121 |
# File 'lib/sensu/redis/client.rb', line 118 def close @closing = true close_connection_after_writing end |
#connected? ⇒ Boolean
Determine if the connection is connected to Redis.
84 85 86 |
# File 'lib/sensu/redis/client.rb', line 84 def connected? @connected || false end |
#connection_completed ⇒ Object
This method is called by EM when the connection is established. This method is reponsible for upgrading to TLS if necessary and validating the connection before Redis commands can be sent.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/sensu/redis/client.rb', line 271 def connection_completed @response_callbacks = [] @multibulk_count = false @connected = true start_tls(@tls) unless @tls.nil? authenticate do select_db verify_version do succeed @reconnect_callbacks[:after].call if @reconnecting @reconnecting = false end end end |
#create_command_methods! ⇒ Object
Create Redis command methods. Command methods wrap ‘redis_command()`. This method is called by `initialize()`.
184 185 186 187 188 189 190 |
# File 'lib/sensu/redis/client.rb', line 184 def create_command_methods! COMMANDS.each do |command| self.class.send(:define_method, command.to_sym) do |*arguments, &block| redis_command(command, *arguments, &block) end end end |
#determine_address { ... } ⇒ Object
Determine the current Redis master address. If Sentinel was used to determine the original address, use it again. If Sentinel is not being used, return the host and port used when the connection was first established.
39 40 41 42 43 44 45 |
# File 'lib/sensu/redis/client.rb', line 39 def determine_address(&block) if @sentinel @sentinel.resolve(&block) else block.call(@host, @port) end end |
#dispatch_error(code) ⇒ Object
Dispatch a Redis error, dropping the associated Redis command response callback, and passing a Redis error object to the error callback (if set).
317 318 319 320 |
# File 'lib/sensu/redis/client.rb', line 317 def dispatch_error(code) @response_callbacks.shift error(CommandError, code) end |
#dispatch_response(value) ⇒ Object
Dispatch a response. If a multi bulk response has begun, this method will build the completed response array before the associated Redis command response callback is called. If one or more pubsub callbacks are defined, the approprate pubsub callbacks are called, provided with the pubsub response. Redis command response callbacks may have an optional processor block, responsible for producing a value with the correct type, e.g. “1” -> true (boolean).
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/sensu/redis/client.rb', line 332 def dispatch_response(value) if @multibulk_count @multibulk_values << value @multibulk_count -= 1 if @multibulk_count == 0 value = @multibulk_values @multibulk_count = false else return end end if @pubsub_callbacks && value.is_a?(Array) if PUBSUB_RESPONSES.include?(value[0]) @pubsub_callbacks[value[1]].each do |block| block.call(*value) if block end return end end processor, block = @response_callbacks.shift if block value = processor.call(value) if processor block.call(value) end end |
#error(klass, message) ⇒ Object
Create an error and pass it to the connection error callback. This method will close the current connection and trigger a reconnect (via ‘unbind()`) if `@reconnect_on_error` is `true`. Closing the connection here is necessary to stop EventMachine from reusing the same connection handler (we want a fresh Redis connection).
77 78 79 80 81 |
# File 'lib/sensu/redis/client.rb', line 77 def error(klass, ) redis_error = klass.new() @error_callback.call(redis_error) close_connection if @reconnect_on_error end |
#on_error(&block) ⇒ Object
Set the connection error callback. This callback is called when the connection encounters either a connection, protocol, or command error.
50 51 52 |
# File 'lib/sensu/redis/client.rb', line 50 def on_error(&block) @error_callback = block end |
#parse_response_line(line) ⇒ Object
Parse a RESP line. This method is called by ‘receive_data()`. You can read about RESP @ redis.io/topics/protocol
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/sensu/redis/client.rb', line 362 def parse_response_line(line) # Trim off the response type and delimiter (\r\n). response = line.slice(1..-3) # First character indicates response type. case line[0, 1] when MINUS # Error, e.g. -ERR dispatch_error(response) when PLUS # String, e.g. +OK dispatch_response(response) when DOLLAR # Bulk string, e.g. $3\r\nfoo\r\n response_length = Integer(response) if response_length == -1 # No data, return nil. dispatch_response(nil) elsif @buffer.length >= response_length + 2 # Complete data. dispatch_response(@buffer.slice!(0, response_length)) @buffer.slice!(0,2) # Discard delimeter (\r\n). else # Incomplete, have data pushed back into buffer. return INCOMPLETE end when COLON # Integer, e.g. :8 dispatch_response(Integer(response)) when ASTERISK # Array, e.g. *2\r\n$3\r\foo\r\n$3\r\nbar\r\n multibulk_count = Integer(response) if multibulk_count == -1 || multibulk_count == 0 # No data, return []. dispatch_response([]) else begin_multibulk(multibulk_count) # Accumulate responses. end else error(ProtocolError, "response type not recognized: #{line.strip}") end end |
#receive_data(data) ⇒ Object
This method is called by EM when the connection receives data. This method assumes that the incoming data is using RESP and it is parsed by ‘parse_resp_line()`.
400 401 402 403 404 405 406 407 408 409 |
# File 'lib/sensu/redis/client.rb', line 400 def receive_data(data) (@buffer ||= '') << data while index = @buffer.index(DELIM) line = @buffer.slice!(0, index+2) if parse_response_line(line) == INCOMPLETE @buffer[0...0] = line break end end end |
#reconnect! ⇒ Object
Reconnect to Redis. The before reconnect callback is first called if not already reconnecting. This method uses a 1 second delay before attempting a reconnect. The method ‘determine_address()` is used to determine the correct host and port to reconnect to, either via Sentinel (new master) or the previous host and port. This method uses `resolve_host()` to first resolve the determined host, if it’s not already an IP address. Resolving the hostname upfront guards against lookup failures that would cause the Sensu process to crash. Upfront hostname resolution also allows this Redis library to work with Amazon AWS ElastiCache & where DNS is used as a failover mechanism.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/sensu/redis/client.rb', line 100 def reconnect! @reconnect_callbacks[:before].call unless @reconnecting @reconnecting = true EM.add_timer(1) do determine_address do |host, port| resolve_host(host) do |ip_address| if ip_address.nil? reconnect! else reconnect(ip_address, port.to_i) end end end end end |
#redis_command(command, *arguments) { ... } ⇒ Object
Send a Redis command once the Redis connection has been established (EM Deferable succeeded).
172 173 174 175 176 177 178 179 180 |
# File 'lib/sensu/redis/client.rb', line 172 def redis_command(command, *arguments, &block) if @deferred_status == :succeeded send_command(command, *arguments, &block) else callback do send_command(command, *arguments, &block) end end end |
#select_db ⇒ Object
Select a Redis DB if a DB has been set in the connection options. This method (& Redis command) does not require a response callback.
248 249 250 |
# File 'lib/sensu/redis/client.rb', line 248 def select_db send_command(SELECT_COMMAND, @db) if @db end |
#send_command(command, *arguments) { ... } ⇒ Object
Send a Redis command and queue the associated response callback. This method calls ‘send_command_data()` for RESP multi bulk and transmission.
161 162 163 164 |
# File 'lib/sensu/redis/client.rb', line 161 def send_command(command, *arguments, &block) send_command_data(command, *arguments) @response_callbacks << [RESPONSE_PROCESSORS[command], block] end |
#send_command_data(*arguments) ⇒ Object
Send a Redis command using RESP multi bulk. This method sends data to Redis using EM connection ‘send_data()`.
145 146 147 148 149 150 151 152 |
# File 'lib/sensu/redis/client.rb', line 145 def send_command_data(*arguments) data = "*#{arguments.length}#{DELIM}" arguments.each do |value| value = value.to_s data << "$#{value.bytesize}#{DELIM}#{value}#{DELIM}" end send_data(data) end |
#ssl_handshake_completed ⇒ Object
This method is called by EM when the SSL/TLS handshake has been completed, as a result of calling #start_tls to initiate SSL/TLS on the connection. Log when the TLS handshake is complete.
290 291 292 293 294 295 296 297 298 |
# File 'lib/sensu/redis/client.rb', line 290 def ssl_handshake_completed if @logger @logger.debug("redis tls handshake complete", { :host => @host, :port => @port, :tls => @tls }) end end |
#subscribe(channel) { ... } ⇒ Object
Subscribe to a Redis PubSub channel.
196 197 198 199 200 201 |
# File 'lib/sensu/redis/client.rb', line 196 def subscribe(channel, &block) @pubsub_callbacks ||= {} @pubsub_callbacks[channel] ||= [] @pubsub_callbacks[channel] << block redis_command(SUBSCRIBE_COMMAND, channel, &block) end |
#unbind ⇒ Object
This method is called by EM when the connection closes, either intentionally or unexpectedly. This method is reponsible for starting the reconnect process when appropriate.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/sensu/redis/client.rb', line 126 def unbind @deferred_status = nil @pubsub_callbacks = nil if @closing @reconnecting = false elsif ((@connected || @reconnecting) && @auto_reconnect) || @reconnect_on_error reconnect! elsif @connected error(ConnectionError, "connection closed") else error(ConnectionError, "unable to connect to redis server") end @connected = false end |
#unsubscribe(channel = nil) { ... } ⇒ Object
Unsubscribe to one or more Redis PubSub channels. If a channel is provided, this method will unsubscribe from it. If a channel is not provided, this method will unsubscribe from all Redis PubSub channels.
210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/sensu/redis/client.rb', line 210 def unsubscribe(channel=nil, &block) @pubsub_callbacks ||= {} arguments = [UNSUBSCRIBE_COMMAND] if channel @pubsub_callbacks[channel] = [block] arguments << channel else @pubsub_callbacks.each_key do |key| @pubsub_callbacks[key] = [block] end end redis_command(arguments) end |
#verify_version { ... } ⇒ Object
Verify the version of Redis. Redis >= 2.0 RC 1 is required for certain Redis commands that Sensu uses. A connection error is created if the Redis version does not meet the requirements.
257 258 259 260 261 262 263 264 265 |
# File 'lib/sensu/redis/client.rb', line 257 def verify_version send_command(INFO_COMMAND) do |redis_info| if redis_info[:redis_version] < "1.3.14" error(ConnectionError, "redis version must be >= 2.0 RC 1") else yield if block_given? end end end |